In [67]:
# Import required packages
import pandas as pd
from kafka import KafkaProducer
import json
import os
import csv
import time

In [68]:
# This method archives the file after processing
def archive_file(directory,filename):
    try:
        new_filename = os.path.splitext(filename)[0] + '.txt'
        os.rename(os.path.join(directory, filename), os.path.join(directory, new_filename))
        print(f'{os.path.join(directory, filename)} ARCHIVE SUCCESSFUL')
    except:
        print('Error in archiving file')

In [69]:
# Name of the topic
topic= 'flight-topic'

# This method posts messages into Kafka topic
def send_kafka_topic(csv):
    try:
        # Create a Kafka producer instance
        producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                            value_serializer=lambda x: json.dumps(x).encode('utf-8'))
        # Iterate over each row of the DataFrame and send it to the Kafka topic
        for _, row in csv.iterrows():
            producer.send(topic, value=row.to_dict())

        # Close the Kafka producer
        producer.close()
        csv = None
        print(F'Write to Kafka topic {topic} completed')
        return True
    except:
        return False

In [70]:
# Read CSV file 
def read_csv_file(directory,csv_file):
    try:
        file_path = os.path.join(directory, csv_file)
        print('File path: ', file_path)
        chunks = pd.read_csv(file_path, chunksize=5000)
        csv = pd.concat(chunks)
        print(csv.head(2))
        res = send_kafka_topic(csv)
        if(res):
            archive_file(directory,csv_file)
        else:
            return 'FAILED during Kafka Load'
        return 'COMPLETED', len(csv)
    except:
        return 'FAILED'
    

In [71]:
# Path to directory
directory = '../FileGateway/'

retry = 0
# Wait time
n = 20

# Check for input files from the directory. Sleep after processing and RETRY the steps.
while True:
    # Check for CSV files in directory
    csv_files = [f for f in os.listdir(directory) if f.endswith('.csv')]
    if csv_files:
        # Read each CSV file
        for csv_file in csv_files:
            # with open(os.path.join(directory, csv_file)) as file:
            val, num = read_csv_file(directory,csv_file)
            print(f'File {csv_file} read status : {val}. Number of Entries {num}')
            retry=0
    else:
        print(f"No CSV files found in directory {directory}. Retrying after {n} sec")
    print("-"*65)
    # Wait n seconds before checking again
    time.sleep(n)  
    retry+=1
    # check for retry count
    if(retry==10):
        print('Job run Completed')
        break
    


No CSV files found in directory ../FileGateway/. Retrying after 20 sec
-----------------------------------------------------------------
No CSV files found in directory ../FileGateway/. Retrying after 20 sec
-----------------------------------------------------------------
File path:  ../FileGateway/2009.csv
      FL_DATE OP_CARRIER  OP_CARRIER_FL_NUM ORIGIN DEST  CRS_DEP_TIME  \
0  2009-01-01         XE               1204    DCA  EWR          1100   
1  2009-01-01         XE               1206    EWR  IAD          1510   

   DEP_TIME  DEP_DELAY  TAXI_OUT  WHEELS_OFF  ...  CRS_ELAPSED_TIME  \
0    1058.0       -2.0      18.0      1116.0  ...              62.0   
1    1509.0       -1.0      28.0      1537.0  ...              82.0   

   ACTUAL_ELAPSED_TIME  AIR_TIME  DISTANCE  CARRIER_DELAY  WEATHER_DELAY  \
0                 68.0      42.0     199.0            NaN            NaN   
1                 75.0      43.0     213.0            NaN            NaN   

  NAS_DELAY  SECURITY_DELAY