In [1]:
import json
import uuid

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin.new_topic import NewTopic
from kafka.errors import TopicAlreadyExistsError

import os
import pandas as pd
import datetime
import threading
from pathlib import Path
import time
import s3fs
import pyarrow.parquet as pq
from collections import namedtuple

In [3]:
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Reenie',
    last_name='Christudass'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

print(config)


{'bootstrap_servers': ['kafka.kafka.svc.cluster.local:9092'], 'first_name': 'Reenie', 'last_name': 'Christudass', 'client_id': 'ChristudassReenie', 'topic_prefix': 'ChristudassReenie'}


In [4]:
def loadParquet(parq_path):
    pqr = spark.read.parquet(parq_path)
    # Convert from spark dataframe to pandas dataframe
    pqr = pqr.toPandas()
    return pqr

def splitstr(std):
    before, after = str(std).split('.')
    return before, after

def startTimer(results_dir):
    # Loop on time
    print("call function here")
    retval = startTimedParquetStreamUpdateLoop(results_dir)
    # Stop if time is over and there are no more partitions.
    if ((time.time() - start_time) < 70 and retval == 0):
        t = threading.Timer(interval, startTimer(results_dir))

In [5]:
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    name = '{}-{}'.format(topic_prefix, topic_name)
    
    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers, 
        client_id=client_id
    )
    
    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Created topic "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))

In [6]:
    
create_kafka_topic('locations')
create_kafka_topic('accelerations')

Created topic "ChristudassReenie-locations"
Created topic "ChristudassReenie-accelerations"


In [7]:
producer = KafkaProducer(
  bootstrap_servers=config['bootstrap_servers'],
  value_serializer=lambda x: json.dumps(x).encode('utf-8')
)

In [8]:
def on_send_success(record_metadata):
    print('Message sent:\n    Topic: "{}"\n    Partition: {}\n    Offset: {}'.format(
        record_metadata.topic,
        record_metadata.partition,
        record_metadata.offset
    ))
    
def on_send_error(excp):
    print('I am an errback', exc_info=excp)
    # handle exception

def send_data(topic, data, config=config, producer=producer, msg_key=None):
    topic_prefix = config['topic_prefix']
    topic_name = '{}-{}'.format(topic_prefix, topic)
    
    if msg_key is not None:
        key = msg_key
    else:
        key = uuid.uuid4().hex
    
    producer.send(
        topic_name, 
        value=data,
        key=key.encode('utf-8')
    ).add_callback(on_send_success).add_errback(on_send_error)

In [9]:
example_data = dict(
    key1='value1',
    key2='value2'
)

send_data('locations', example_data)

Message sent:
    Topic: "ChristudassReenie-locations"
    Partition: 0
    Offset: 0


In [33]:
import os
import pandas as pd
import pyarrow.parquet as pq

# Set the path to the directory containing the parquet files
path = "dsc650/data/processed/bdd/accelerations"

# Create an empty list to store the dataframes
dfs = []

# Loop through each subdirectory in the path
for subdir in os.listdir(path):
    subdir_path = os.path.join(path, subdir)
    # Loop through each parquet file in the subdirectory
    for file in os.listdir(subdir_path):
        if file.endswith(".parquet"):
            file_path = os.path.join(subdir_path, file)
            # Read the parquet file into a dataframe
            table = pq.read_table(file_path)
            df = table.to_pandas()
            # Append the dataframe to the list of dataframes
            dfs.append(df)

# Concatenate all the dataframes into one
df_acc = pd.concat(dfs, ignore_index=True)
df_acc['timestamp'] = df['timestamp'].astype(str)
# Print the dataframe
print(df_acc.head())


                                 id                           ride_id   
0  85c61911b7fe2ced1000c33c9e932706  6760ffa3f41908695d1405b776c3e8d5  \
1  85c61911b7fe2ced1000c33c9e932706  6760ffa3f41908695d1405b776c3e8d5   
2  85c61911b7fe2ced1000c33c9e932706  6760ffa3f41908695d1405b776c3e8d5   
3  85c61911b7fe2ced1000c33c9e932706  6760ffa3f41908695d1405b776c3e8d5   
4  85c61911b7fe2ced1000c33c9e932706  6760ffa3f41908695d1405b776c3e8d5   

                               uuid                   timestamp     offset   
0  dad7eae44e784b549c8c5a3aa051a8c7  1970-01-01 00:25:04.256854  17.912913  \
1  dad7eae44e784b549c8c5a3aa051a8c7  1970-01-01 00:25:04.256854  17.932913   
2  dad7eae44e784b549c8c5a3aa051a8c7  1970-01-01 00:25:04.256854  17.951913   
3  dad7eae44e784b549c8c5a3aa051a8c7  1970-01-01 00:25:04.256854  17.971913   
4  dad7eae44e784b549c8c5a3aa051a8c7  1970-01-01 00:25:04.256854  17.990913   

       x      y      z  timelapse                                  filename  
0 -1.044  0.09

In [32]:
import os
import pandas as pd
import pyarrow.parquet as pq

# Set the path to the directory containing the parquet files
path = "dsc650/data/processed/bdd/locations"

# Create an empty list to store the dataframes
dfs = []

# Loop through each subdirectory in the path
for subdir in os.listdir(path):
    subdir_path = os.path.join(path, subdir)
    # Loop through each parquet file in the subdirectory
    for file in os.listdir(subdir_path):
        if file.endswith(".parquet"):
            file_path = os.path.join(subdir_path, file)
            # Read the parquet file into a dataframe
            table = pq.read_table(file_path)
            df = table.to_pandas()
            # Append the dataframe to the list of dataframes
            dfs.append(df)

# Concatenate all the dataframes into one
df_loc = pd.concat(dfs, ignore_index=True)
df_loc['timestamp'] = df['timestamp'].astype(str)
# Print the dataframe
print(df_loc.head())

                                 id                           ride_id   
0  85c61911b7fe2ced1000c33c9e932706  6760ffa3f41908695d1405b776c3e8d5  \
1  58682c5d48cad9d9e103431d773615bf  c9a2b46c9aa515b632eddc45c4868482   
2  759e4b98ce8cfadd1004453cb2107436  91126a54bb966b38f64c62aded306c37   
3  d23ab293596c360dab6e8fcdb1160292  57ac74e28d873fb0b1c7d9998349f008   
4  5f0b30f49da18ebc1b22e3f84a9a14db  01d87368140cb74ba7cead55b6dbfc7c   

                               uuid                   timestamp     offset   
0  dad7eae44e784b549c8c5a3aa051a8c7  1970-01-01 00:25:04.256854  18.077913  \
1  19b9aa10588646b3bf22c9b4865a7995                         NaN  18.525061   
2  2cc21cea5b534b59b596172f8dacbe5b                         NaN  85.620195   
3  8c6d4a328f8f4d1abd748eaa64db26b3                         NaN  85.294091   
4  b59f82a2612048079456e8b5f9d5fdb9                         NaN  85.385934   

       course   latitude  longitude       geohash  speed  accuracy  timelapse   
0  153.2812

In [34]:
start_time = time.time()
offset = []
topics = []
datas = []
for i in range(df_acc.shape[0]):
    data_acc = df_acc.loc[i].to_dict()
    offset.append(data_acc['offset'])
    datas.append(data_acc)
    topics.append('accelerations')
for i in range(df_loc.shape[0]):
    data_loc = df_loc.loc[i].to_dict()
    offset.append(data_loc['offset'])
    datas.append(data_loc)
    topics.append('locations')

In [35]:
#combining the lists into a dataframe and sorting by offset
df = pd.DataFrame({'offset': offset, 'topics': topics, 'datas': datas})
df = df.sort_values(by = ['offset'])
df.head()


Unnamed: 0,offset,topics,datas
22800,0.822061,accelerations,"{'id': '58682c5d48cad9d9e103431d773615bf', 'ri..."
22801,0.842061,accelerations,"{'id': '58682c5d48cad9d9e103431d773615bf', 'ri..."
22802,0.862061,accelerations,"{'id': '58682c5d48cad9d9e103431d773615bf', 'ri..."
22803,0.882061,accelerations,"{'id': '58682c5d48cad9d9e103431d773615bf', 'ri..."
22804,0.902061,accelerations,"{'id': '58682c5d48cad9d9e103431d773615bf', 'ri..."


In [37]:
for i in range(df.shape[0]):
    offset = df['offset'][i]
    while (time.time() - start_time) < offset:
        pass
    #print(df['topics'][i], df['datas'][i])
    send_data(df['topics'][i], df['datas'][i])

Message sent:
    Topic: "ChristudassReenie-accelerations"
    Partition: 0
    Offset: 23512
Message sent:
    Topic: "ChristudassReenie-accelerations"
    Partition: 0
    Offset: 23513
Message sent:
    Topic: "ChristudassReenie-accelerations"
    Partition: 0
    Offset: 23514
Message sent:
    Topic: "ChristudassReenie-accelerations"
    Partition: 0
    Offset: 23515
Message sent:
    Topic: "ChristudassReenie-accelerations"
    Partition: 0
    Offset: 23516
Message sent:
    Topic: "ChristudassReenie-accelerations"
    Partition: 0
    Offset: 23517
Message sent:
    Topic: "ChristudassReenie-accelerations"
    Partition: 0
    Offset: 23518
Message sent:
    Topic: "ChristudassReenie-accelerations"
    Partition: 0
    Offset: 23519
Message sent:
    Topic: "ChristudassReenie-accelerations"
    Partition: 0
    Offset: 23520
Message sent:
    Topic: "ChristudassReenie-accelerations"
    Partition: 0
    Offset: 23521
Message sent:
    Topic: "ChristudassReenie-accelerations"
 