#  Assignment-2B-Task1_flight_producer: Producing the streaming data (30%)

Name : Pichaphop Sunthornjittanon

Student ID : 31258301

In this section, you will need to implement an Apache Kafka producer to simulate the
real-time streaming of the data.

Important:

● In this task, you need to generate the event timestamp in UTC timezone for each data
record in the producer, and then convert the timestamp to unix-timestamp format
(keeping UTC timezone) to simulate the “ts” column. For example, if the current time
is 2021-9-28 12:39:45 UTC, it should be converted to the value of 1632796806, and
stored in the “ts” column

● Please do not use Spark in this task



#### Event Flight Producer

Write a python program that loads all the data from “flight*.csv”. Save the file as
Assignment-2B-Task1_flight_producer.ipynb.
Your program should send X number and Y number of records from each producer
following the sequence to the Kafka stream every 5 seconds. X represents the records to
send in a particular batch, whereas Y represents the records to send in the next batch (pending
records). The sequence of the batch is depicted in Fig.1.

There are some steps need to be carried out for this task:

1. Generate random numbers A and B, whose values are between 70-100 (inclusive) and
5-10 (inclusive) respectively, which are regenerated for each keyFlight. The
keyFlights are generated from the column ‘DAY_OF_WEEK’ in the dataset which has
7 unique keys. These values 1, 2, 3, 4, 5, 6, and, 7 represents ‘sunday’,‘monday’,
‘tuesday’, ‘wednesday’,’thursday’,’friday’,’saturday’
2. You will need to append event time in unix-timestamp format (as mentioned above)
for each key. Assuming that there are 7 keys in the flight-dataset as mentioned above,
there will be 7 unix-timestamp for each batch.
3. Each batch data contains 7 group (7 sub batches) instances generated from each key.
All of them are concatenated in the form of the list of dictionaries

a. Explanation of a group instances/records


i. If A1 represents a group of instances/records generated from key = ‘1’
and B1 represents a group of pending instances/records generated from
key = ‘1’, thus batch 1 (X1) contains [A1; A2; A3; A4; A5; A6; A7] and
batch 1 pending (Y1) contains [B1; B2; B3; B4; B5; B6; B7].

ii. A1 and B1 have the same ‘ts’ as it is generated from the same batch atthe same time. The same case is also the same for A2 and B2 and so on.

iii. Given random numbers A and B, the number of instances in A1, A2 and
B1, B2 and so on vary

b. Explanation of a dictionary.
Dictionary represents an instance of data which output can be seen as follow


i. Example of a dictionary with key = ‘1’ {‘ts’:1632796806,..,
‘DAY_OF_WEEK’:1, ‘month’:1,...}

ii. Example of a dictionary with key = ‘7’ {‘ts’:1632744322,...,
‘DAY_OF_WEEK’:7, ‘month’:3,...}

iii. Dictionary is a part of a sub batch data. A sub batch data is a part of a
batch data X. This also applies for pending data B.

4. Simulation of data stream in Fig.1.

a. At time1: X1 and Y1 are generated on the producer side, but only X1 is sent.

b. At time2: X2 and Y2 are generated on the producer side, but only X2 and
pending data from the previous batch (Y1) are sent to the consumer.


5. If the data in each key is exhausted, restart from the first sequence again.


6. Pseudocode for this task:

a. Take the DAY_OF_WEEK column as the key, name a variable KeyFlights
which contains the set of keys (7 keys).

b. Create a function getFlightRecords, which returns a variable named
flightRecords, which is a dictionary that contains all flight data with their
associated keys (step 3).

c. Create a topic called ‘flightTopic’

d. Create an instance variable called ‘flightProducer’

e. for each keyFlight in KeyFlights

i. Generate A[‘keyFlight’] and B[‘keyFlight’] and give both the
timestamp as formatted in 3.b or Fig.2.

ii. Concatenate all A and B. It will form the data batch X and Y
respectively, see Fig.2.

f. Send X and Y to the consumer following the rule in step 4 or Fig.1

In [1]:
# 1. Import libraries

from time import sleep
from json import dumps
from kafka import KafkaProducer
import random
import datetime as dt
# import csv
import pandas as pd
import time

In [2]:
# 2. Load flight data to dataframe

# Initiate flight_df
flight_df = pd.DataFrame()

# Load all flight CSV file to dataframe
for number in range(1,21):
    
    # Load each file
    cur_file = pd.read_csv(f'data/flight{number}.csv')

    # Combine to the previous file
    flight_df = flight_df.append(cur_file)


In [3]:
# 2. Create useful functions

# Create publish_message function - publish data and print the size of data that's just sent
def publish_message(producer_instance, topic_name, data):
    try:
        producer_instance.send(topic_name, data)
        print('Message published successfully. Data Size: ' + str(len(data)))
    except Exception as ex:
        print('Exception in publishing message.')
        print(str(ex))
        
# Create connect_kafka_producer function - configure kafka producer
def connect_kafka_producer():
    _producer = None
    try:
        _producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                                  value_serializer=lambda x: dumps(x).encode('ascii'),
                                  api_version=(0, 10))
    except Exception as ex:
        print('Exception while connecting Kafka.')
        print(str(ex))
    finally:
        return _producer
    
# Create a function getFlightRecords, which returns a variable named flightRecords, 
# which is a dictionary that contains all flight data with their associated keys (Step B in the pseudocode)
def getFlightRecords(key) :
    flightRecords = flight_df[flight_df['DAY_OF_WEEK'] == key].to_dict(orient='records')
    
    return flightRecords


In [4]:
# 3.Write the code that follow pseudcode in 1.6 from specification

if __name__ == '__main__':

    # a. Take the DAY_OF_WEEK column as the key, name a variable KeyFlights which contains the set of keys (7 keys)
    KeyFlights = set(flight_df['DAY_OF_WEEK'])

    # b. Create a function getFlightRecords, which returns a variable named flightRecords, 
    # which is a dictionary that contains all flight data with their associated keys (Done in the previous section)

    # Use function getFlightRecords to get data of each associated keys flightRecords_1 - flightRecords_7 (list of dict)
    for keyFlight in KeyFlights :

        # Create flightRecord_1 - flightRecord_7 
        globals()['flightRecords_%s' % keyFlight]  = getFlightRecords(keyFlight)

    # c. Create a topic called ‘flightTopic
    topic_name = 'flightTopic'

    # d. Create an instance variable called ‘flightProducer’
    flightProducer = connect_kafka_producer()

    # e.

    # Initialise time t
    t = 1

    while True :

        # e. for each keyFlight in KeyFlights
        for keyFlight in KeyFlights :

            # Generate event unix-timestamp (UTC timezone)
            ts = int(dt.datetime.utcnow().timestamp())

            ############################################## For An ##############################################


            # Generate random number A between 70-100(inclusive)
            num_A = random.randint(70,100)    
            
            # Create stream_data that stores data that we want to put to An 
            stream_data = globals()['flightRecords_%s' % keyFlight][0:num_A]
            
            ########### This is a process of selecting flight data in sequence and reusing data once it is exhausted ####
            # Remove the stream data from the flight record in each key
            globals()['flightRecords_%s' % keyFlight] = globals()['flightRecords_%s' % keyFlight][num_A:
                                                                len(globals()['flightRecords_%s' % keyFlight])]
            
            # Append them back to the end of the list
    
            globals()['flightRecords_%s' % keyFlight].extend(stream_data)

            ###########
            
            # Create An variable that contains stream data that we want to send
            globals()['A%s' % keyFlight] = stream_data
            

            # Add timestamp to An
            for index in range(0,len(globals()['A%s' % keyFlight])) :
                globals()['A%s' % keyFlight][index]['ts'] = ts

            ####################################################################################################
            
            ############################################## For Bn ##############################################

            # Generate random number B between 5-10(inclusive)
            num_B = random.randint(5,10)

            # Create stream_data that stores data that we want to put to Bn
            stream_data = globals()['flightRecords_%s' % keyFlight][0:num_B]
            
            ########### This is a process of selecting flight data in sequence and reusing data once it is exhausted ####
            
            # Remove the stream data from the flight record in each key
            globals()['flightRecords_%s' % keyFlight] = globals()['flightRecords_%s' % keyFlight][num_B:
                                                                                len(globals()['flightRecords_%s' % keyFlight])]    
    

    
            # Append them back to the end of the list
            globals()['flightRecords_%s' % keyFlight].extend(stream_data)

            ###########
            
            # Create Bn variable that contains stream data that we want to send
            globals()['B%s' % keyFlight] = stream_data
    

            # Add timestamp to B
            for index in range(0,len(globals()['B%s' % keyFlight])) :
                globals()['B%s' % keyFlight][index]['ts'] = ts
            
            sleep(1)
            
            ####################################################################################################

        # Concate A1-A7 to get Xt
        X = []
        X.extend(A1)
        X.extend(A2)
        X.extend(A3)
        X.extend(A4)
        X.extend(A5)
        X.extend(A6)
        X.extend(A7)

        # Concate B1-B7 to get Yt
        Y = []
        Y.extend(B1)
        Y.extend(B2)
        Y.extend(B3)
        Y.extend(B4)
        Y.extend(B5)
        Y.extend(B6)
        Y.extend(B7)

        print(f'\n################ T{t} ################')

        print(f'\n### Unix Timestamp : {ts} ###')

        # If t=1
        if t == 1 :

            print(f'X{t}',' : Data size =', len(X))

            # Publish only X1
            publish_message(flightProducer, topic_name, X)

        # If time is greater than 1
        else :

            print(f'X{t}',' : Data size =', len(X))
            print(f'Y{t-1}',' : Data size =', len(Y_previous))

            # Concat Xt and Yt-1
            flight_data = X+Y_previous

            # Publish Xt and Yt-1
            publish_message(flightProducer, topic_name, flight_data)

        # Store Y_previous with Yt
        Y_previous = Y.copy()

        # Increase time by 1
        t += 1

        # send X number and Y number of records every 5 seconds
        sleep(3)




################ T1 ################

### Unix Timestamp : 1634304690 ###
X1  : Data size = 601
Message published successfully. Data Size: 601

################ T2 ################

### Unix Timestamp : 1634304701 ###
X2  : Data size = 610
Y1  : Data size = 57
Message published successfully. Data Size: 667

################ T3 ################

### Unix Timestamp : 1634304711 ###
X3  : Data size = 585
Y2  : Data size = 58
Message published successfully. Data Size: 643

################ T4 ################

### Unix Timestamp : 1634304721 ###
X4  : Data size = 587
Y3  : Data size = 47
Message published successfully. Data Size: 634

################ T5 ################

### Unix Timestamp : 1634304731 ###
X5  : Data size = 593
Y4  : Data size = 51
Message published successfully. Data Size: 644

################ T6 ################

### Unix Timestamp : 1634304741 ###
X6  : Data size = 546
Y5  : Data size = 47
Message published successfully. Data Size: 593

################ T7 ##########

KeyboardInterrupt: 