In [1]:
import json
import uuid

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin.new_topic import NewTopic
from kafka.errors import TopicAlreadyExistsError

import os
import pandas as pd
import datetime
import threading
from pathlib import Path
import time
import s3fs
import pyarrow.parquet as pq
from collections import namedtuple

In [2]:
endpoint_url='https://storage.budsc.midwest-datascience.com'
s3 = s3fs.S3FileSystem(
    anon=True,
    client_kwargs={
        'endpoint_url': endpoint_url
    }
)

In [3]:
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Supraja',
    last_name='Rapuru'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

config

{'bootstrap_servers': ['kafka.kafka.svc.cluster.local:9092'],
 'first_name': 'Supraja',
 'last_name': 'Rapuru',
 'client_id': 'RapuruSupraja',
 'topic_prefix': 'RapuruSupraja'}

### Create Topic Utility Function

The `create_kafka_topic` helps create a Kafka topic based on your configuration settings.  For instance, if your first name is *Vinay* and your last name is *Nagaraj*, `create_kafka_topic('locations')` will create a topic with the name `DoeJohn-locations`.  The function will not create the topic if it already exists. 

In [4]:
def loadParquet(parq_path):
    pqr = spark.read.parquet(parq_path)
    # Convert from spark dataframe to pandas dataframe
    pqr = pqr.toPandas()
    return pqr

def splitstr(std):
    before, after = str(std).split('.')
    return before, after

def startTimer(results_dir):
    # Loop on time
    print("call function here")
    retval = startTimedParquetStreamUpdateLoop(results_dir)
    # Stop if time is over and there are no more partitions.
    if ((time.time() - start_time) < 70 and retval == 0):
        t = threading.Timer(interval, startTimer(results_dir))

In [13]:
import logging
logging.basicConfig(level=logging.DEBUG)
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    kafka_host = '192.168.1.164'
    kafka_port = '9092'
    kafka_server = '%s:%s' % (kafka_host, kafka_port)
    #bootstrap_servers = config['bootstrap_servers']
    bootstrap_servers = 'localhost:9092'
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    api_version=(2, 7, 0)
    name = '{}-{}'.format(topic_prefix, topic_name)
    
    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers, 
        client_id=client_id
    )
    
    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Created topic "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))
    
create_kafka_topic('locations')
create_kafka_topic('accelerations')

DEBUG:kafka.admin.client:Starting KafkaAdminClient with configuration: {'bootstrap_servers': 'localhost:9092', 'client_id': 'RapuruSupraja'}
DEBUG:kafka.metrics.metrics:Added sensor with name connections-closed
DEBUG:kafka.metrics.metrics:Added sensor with name connections-created
DEBUG:kafka.metrics.metrics:Added sensor with name select-time
DEBUG:kafka.metrics.metrics:Added sensor with name io-time
DEBUG:kafka.client:Initiating connection to node bootstrap-0 at localhost:9092
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent-received
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name request-latency
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.bytes-sent
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.bytes-received
DEBUG:kafka.metrics.metrics:Added sensor with name node-bootstrap-0.latency
DEBUG:kafka.c

NoBrokersAvailable: NoBrokersAvailable

### Kafka Producer

The following code creates a `KafkaProducer` object which you can use to send Python objects that are serialized as JSON.

**Note:** This producer serializes Python objects as JSON. This means that object must be JSON serializable.  As an example, Python `DateTime` values are not JSON serializable and must be converted to a string (e.g. ISO 8601) or a numeric value (e.g. a Unix timestamp) before being sent.

In [6]:
producer = KafkaProducer(
  bootstrap_servers=config['bootstrap_servers'],
  value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

### Send Data Function

The `send_data` function sends a Python object to a Kafka topic. This function adds the `topic_prefix` to the topic so `send_data('locations', data)` sends a JSON serialized message to `NagarajVinay-locations`. The function also registers callbacks to let you know if the message has been sent or if an error has occured. 

In [7]:
def on_send_success(record_metadata):
    print('Message sent:\n    Topic: "{}"\n    Partition: {}\n    Offset: {}'.format(
        record_metadata.topic,
        record_metadata.partition,
        record_metadata.offset
    ))
    
def on_send_error(excp):
    print('I am an errback', exc_info=excp)
    # handle exception

def send_data(topic, data, config=config, producer=producer, msg_key=None):
    topic_prefix = config['topic_prefix']
    topic_name = '{}-{}'.format(topic_prefix, topic)
    
    if msg_key is not None:
        key = msg_key
    else:
        key = uuid.uuid4().hex
    
    producer.send(
        topic_name, 
        value=data,
        key=key.encode('utf-8')
    ).add_callback(on_send_success).add_errback(on_send_error)

In [8]:
example_data = dict(
    key1='value1',
    key2='value2'
)

send_data('locations', example_data)

Message sent:
    Topic: "NagarajVinay-locations"
    Partition: 0
    Offset: 2


In [9]:
acceleration_columns = [
    'offset',
    'id',
    'ride_id',
    'uuid',
    'x',
    'y',
    'z',
     't'
]
Acceleration = namedtuple('Acceleration', acceleration_columns)

df_acc = pq.ParquetDataset('s3://data/processed/bdd/accelerations',filesystem=s3).read_pandas().to_pandas()
df_acc = df_acc[acceleration_columns].sort_values(by=['offset'])

location_columns = [
    'offset',
    'id',
    'ride_id',
    'uuid',
    'course',
    'latitude',
    'longitude',
    'geohash',
    'speed',
    'accuracy',
     't'
]
Location = namedtuple('Location', location_columns)

df_loc = pq.ParquetDataset('s3://data/processed/bdd/locations',filesystem=s3).read_pandas().to_pandas()
df_loc = df_loc[location_columns].sort_values(by=['offset'])

In [10]:
start_time = time.time()
offset = []
topics = []
datas = []
for i in range(df_acc.shape[0]):
    data_acc = df_acc.loc[i].to_dict()
    offset.append(data_acc['offset'])
    datas.append(data_acc)
    topics.append('accelerations')
for i in range(df_loc.shape[0]):
    data_loc = df_loc.loc[i].to_dict()
    offset.append(data_loc['offset'])
    datas.append(data_loc)
    topics.append('locations')

In [11]:
#combining the lists into a dataframe and sorting by offset
df = pd.DataFrame({'offset': offset, 'topics': topics, 'datas': datas})
df = df.sort_values(by = ['offset'])
df.head()

Unnamed: 0,offset,topics,datas
0,0.822061,accelerations,"{'offset': 0.8220608865228429, 'id': '58682c5d..."
1,0.842061,accelerations,"{'offset': 0.8420608865228429, 'id': '58682c5d..."
2,0.862061,accelerations,"{'offset': 0.862060886522843, 'id': '58682c5d4..."
3,0.882061,accelerations,"{'offset': 0.882060886522843, 'id': '58682c5d4..."
4,0.902061,accelerations,"{'offset': 0.9020608865228429, 'id': '58682c5d..."


In [12]:
for i in range(df.shape[0]):
    offset = df['offset'][i]
    while (time.time() - start_time) < offset:
        pass
    send_data(df['topics'][i], df['datas'][i])

Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 0
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 1
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 2
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 3
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 4
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 5
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 6
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 7
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 8
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 9
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partition: 0
    Offset: 10
Message sent:
    Topic: "NagarajVinay-accelerations"
    Partit