In [1]:
import json
import uuid
import os
import json
import time
from collections import namedtuple
import heapq
import pandas as pd
import s3fs
import pyarrow.parquet as pq
import threading
import datetime  as dt
import json

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin.new_topic import NewTopic
from kafka.errors import TopicAlreadyExistsError

### Configuration Parameters 

> **TODO:** Change the configuration prameters to the appropriate values for your setup.

In [2]:
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Adam',
    last_name='Curry'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

config

{'bootstrap_servers': ['kafka.kafka.svc.cluster.local:9092'],
 'first_name': 'Adam',
 'last_name': 'Curry',
 'client_id': 'CurryAdam',
 'topic_prefix': 'CurryAdam'}

In [7]:
endpoint_url='https://storage.budsc.midwest-datascience.com'
s3 = s3fs.S3FileSystem(
    anon=True,
    client_kwargs={
        'endpoint_url': endpoint_url
    }
)

acceleration_columns = [
    'offset',
    'id',
    'ride_id',
    'uuid',
    'x',
    'y',
    'z',
#     't'
]
Acceleration = namedtuple('Acceleration', acceleration_columns)
def read_accelerations():
    df = pq.ParquetDataset(
        's3://data/processed/bdd/accelerations',
        filesystem=s3
    ).read_pandas().to_pandas()
    
    df = df[acceleration_columns].sort_values(by=['offset'])
    
    records = [Acceleration(*record) for record in df.to_records(index=False)]
    
    return records
accelerations = read_accelerations()

location_columns = [
    'offset',
    'id',
    'ride_id',
    'uuid',
    'course',
    'latitude',
    'longitude',
    'geohash',
    'speed',
    'accuracy',
#     't'
]
Location = namedtuple('Location', location_columns)
def read_locations():
    df = pq.ParquetDataset(
        's3://data/processed/bdd/locations',
        filesystem=s3
    ).read_pandas().to_pandas()
    
    df = df[location_columns].sort_values(by=['offset'])
    
    records = [Location(*record) for record in df.to_records(index=False)]
    
    return records
    
locations = read_locations()

In [None]:
#locations

### Create Topic Utility Function

The `create_kafka_topic` helps create a Kafka topic based on your configuration settings.  For instance, if your first name is *John* and your last name is *Doe*, `create_kafka_topic('locations')` will create a topic with the name `DoeJohn-locations`.  The function will not create the topic if it already exists. 

In [10]:
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    name = '{}-{}'.format(topic_prefix, topic_name)
    
    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers, 
        client_id=client_id
    )
    
    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Created topic "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))
    
create_kafka_topic('locations')
create_kafka_topic('accelerations')

Topic "CurryAdam-locations" already exists
Topic "CurryAdam-accelerations" already exists


### Kafka Producer

The following code creates a `KafkaProducer` object which you can use to send Python objects that are serialized as JSON.

**Note:** This producer serializes Python objects as JSON. This means that object must be JSON serializable.  As an example, Python `DateTime` values are not JSON serializable and must be converted to a string (e.g. ISO 8601) or a numeric value (e.g. a Unix timestamp) before being sent.

In [11]:
producer = KafkaProducer(
  bootstrap_servers=config['bootstrap_servers'],
  value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

### Send Data Function

The `send_data` function sends a Python object to a Kafka topic. This function adds the `topic_prefix` to the topic so `send_data('locations', data)` sends a JSON serialized message to `DoeJohn-locations`. The function also registers callbacks to let you know if the message has been sent or if an error has occured. 

In [5]:
def on_send_success(record_metadata):
    print('Message sent:\n    Topic: "{}"\n    Partition: {}\n    Offset: {}'.format(
        record_metadata.topic,
        record_metadata.partition,
        record_metadata.offset
    ))
    
def on_send_error(excp):
    print('I am an errback', exc_info=excp)
    # handle exception

def send_data(topic, data, config=config, producer=producer, msg_key=None):
    topic_prefix = config['topic_prefix']
    topic_name = '{}-{}'.format(topic_prefix, topic)
    
    if msg_key is not None:
        key = msg_key
    else:
        key = uuid.uuid4().hex
    
    producer.send(
        topic_name, 
        value=data,
        key=key.encode('utf-8')
    ).add_callback(on_send_success).add_errback(on_send_error)

In [6]:
example_data = dict(
    key1='value1',
    key2='value2'
)

send_data('locations', example_data)

Message sent:
    Topic: "CurryAdam-locations"
    Partition: 0
    Offset: 38


In [14]:
import os
from pathlib import Path
#https://www.youtube.com/watch?v=HIz0pUXhM3U
endpoint_url='https://storage.budsc.midwest-datascience.com'
current_dir = Path(os.getcwd()).absolute()
base_dir = '/home/jovyan/dsc650/data/processed/bdd/'
accelerations=base_dir+'accelerations/'
locations=base_dir+'locations/'
time_dir=os.listdir(locations)

test_dir = '/home/jovyan/dsc650/data/processed/bdd/locations//t=007.8/'

In [17]:
# parse out the names from each folder to iterate through each message
def time_path(folder_path):
    folder_dir = folder_path
    time_dir = os.listdir(folder_dir)
    time_l = list(set([float(x.split('=')[1]) for x in time]))
    time_dict = {}
    for t in time:
        time_dict[float(t.split('=')[1])]=t
    return time_l, time_dict, folder_dir

#time_path(locations)

In [None]:
def send(f):
    i = 0.0
    time_l, time_dict, folder_dir = time_path(f)
    event = threading.Event()
    for t in time_l:
        final_dir = folder_dir+'/'+time_dict[t]+'/'
        pq = read_parq(final_dir)
        event.wait(t-i)
        curr_time = dt.datetime.now()
        i=t
        send_data('locations',pq)
send(locations)