In [1]:
import pyspark
import os
from pyspark.sql import SparkSession

# We need to set the following environment variable, so that Spark knows where YARN runs
os.environ['HADOOP_CONF_DIR']="/etc/hadoop/conf"

# Since we are accessing spark through it's python API, we need to make sure that all executor
# instances run the same version of python. 
os.environ['PYSPARK_PYTHON']="/opt/conda/default/bin/python"
os.environ['PYSPARK_DRIVER_PYTHON']="/opt/conda/default/bin/python"

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
from pyspark import SparkConf

print("Creating Spark configuration...")
conf = SparkConf() \
    .setAppName("StreamProcessing") \
    .setMaster("local[*]") \
    .set("spark.network.timeout", "20000000ms") \
    .set("spark.executor.heartbeatInterval", "10000ms") \
    .set('spark.logConf', 'true')

print("Building Spark session...")
try:
    spark = SparkSession.builder \
        .config(conf=conf) \
        .getOrCreate()
except Exception as e:
    print("Exception while creating Spark session:")
    print(e)

Creating Spark configuration...
Building Spark session...


23/05/21 22:23:43 WARN Utils: Your hostname, vladi-VirtualBox resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/05/21 22:23:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/21 22:23:44 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [5]:
# Define the schema of the data
schema = StructType([
    StructField("Date", StringType(), True),  # Column "Date" contains string data and can contain null values
    StructField("Time gap", IntegerType(), True),  # Column "Time gap" contains integer data and can contain null values
    StructField("Count", IntegerType(), True),  # Column "Count" contains integer data and can contain null values
    StructField("Average speed", IntegerType(), True)  # Column "Average speed" contains integer data and can contain null values
])


In [6]:
# Define the file names of the captors data
file_names = [
    "CAT17_history.csv", "CB1101_history.csv", "CB1142_history.csv", "CB1143_history.csv", "CB1599_history.csv", 
    "CB1699_history.csv", "CB2105_history.csv", "CB02411_history.csv", "CEE016_history.csv", "CEK18_history.csv", 
    "CEK31_history.csv", "CEK049_history.csv", "CEV011_history.csv", "CJE181_history.csv", "CJM90_history.csv", 
    "CLW239_history.csv", "COM205_history.csv", "CVT387_history.csv"
]

In [7]:
import csv
from datetime import datetime, timedelta

# Function to add missing rows to a CSV file
def add_missing_rows(file_path):
    # Define the start and end dates and the quarter-hour increment
    start_date = datetime(2018, 12, 6)
    end_date = datetime(2023, 3, 31)
    quarter_hour = timedelta(minutes=15)

    # Open the CSV file in read mode
    with open(file_path, 'r') as csv_file:
        reader = csv.reader(csv_file)
        rows = list(reader)

    # Check if the first row is the header
    if rows[0] != ['Date', 'Time gap', 'Count', 'Average speed']:
        print("Error: The file's header is incorrect.")
        return

    # Create a dictionary to store existing data
    existing_data = {}
    for row in rows[1:]:
        existing_data[(row[0], int(row[1]))] = (row[2], row[3])

    # Create a list of all dates and quarter-hours
    dates = []
    current_date = start_date
    while current_date <= end_date:
        for i in range(1, 97):  # 96 quarter-hours in a day
            dates.append((current_date, i))
        current_date += timedelta(days=1)  # increment the day

    # Traverse dates and quarter-hours and fill in missing data
    new_rows = [rows[0]]  # Add the header
    for date, hour in dates:
        key = (date.strftime('%Y-%m-%d'), hour)
        if key in existing_data:  # if data exists for this quarter-hour
            new_row = [date.strftime('%Y-%m-%d'), str(hour), existing_data[key][0], existing_data[key][1]]
        else:  # if data does not exist, fill with default values
            new_row = [date.strftime('%Y-%m-%d'), str(hour), '0', '-1']
        new_rows.append(new_row)

    # Write the new rows back into the CSV file
    with open(file_path, 'w', newline='') as csv_file:
        writer = csv.writer(csv_file)
        writer.writerows(new_rows)

    print("Successfully added missing rows.")

# Get all the file names and call the function on each file
for file_name in file_names:
    add_missing_rows(file_name)


Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.
Successfully added missing rows.


In [8]:
# Initialize a variable to hold the combined data
all_data = None

# Iterate over each filename in the list
for file_name in file_names:
    # Read the current CSV file into a DataFrame using the provided schema
    # Set header=True if the first row of CSV file contains column names
    current_data = spark.read.csv(file_name, schema=schema, header=True)
    
    # If this is the first file, assign its data to all_data
    if all_data is None:
        all_data = current_data
    else:
        # If this is not the first file, append its data to all_data
        # Spark's union method concatenates DataFrames with the same schema
        all_data = all_data.union(current_data)

# Display the content of all_data DataFrame
all_data.show()

[Stage 0:>                                                          (0 + 1) / 1]

+----------+--------+-----+-------------+
|      Date|Time gap|Count|Average speed|
+----------+--------+-----+-------------+
|2018-12-06|       1|    0|           -1|
|2018-12-06|       2|    0|           -1|
|2018-12-06|       3|    0|           -1|
|2018-12-06|       4|    0|           -1|
|2018-12-06|       5|    0|           -1|
|2018-12-06|       6|    0|           -1|
|2018-12-06|       7|    0|           -1|
|2018-12-06|       8|    0|           -1|
|2018-12-06|       9|    0|           -1|
|2018-12-06|      10|    0|           -1|
|2018-12-06|      11|    0|           -1|
|2018-12-06|      12|    0|           -1|
|2018-12-06|      13|    0|           -1|
|2018-12-06|      14|    0|           -1|
|2018-12-06|      15|    0|           -1|
|2018-12-06|      16|    0|           -1|
|2018-12-06|      17|    0|           -1|
|2018-12-06|      18|    0|           -1|
|2018-12-06|      19|    0|           -1|
|2018-12-06|      20|    0|           -1|
+----------+--------+-----+-------

                                                                                

In [8]:
import socket
import time
import pandas as pd
import glob

def read_data(directory):
    # Get a list of all CSV files in the specified directory
    all_files = glob.glob(directory + "/*.csv")

    data_frames = []
    for file_Name in all_files:
        # Read each CSV file as a DataFrame
        df = pd.read_csv(file_Name, index_col=None, header=0)

        # Extract the sensor ID from the filename
        sensor_id = file_Name.split('/')[-1].split('_')[0]
        # Rename the 'Count' and 'Average speed' columns with the sensor ID prefix
        df = df.rename(columns={'Count': f'Count_{sensor_id}', 'Average speed': f'Average_speed_{sensor_id}'})
        
        # Drop the 'Sensor' column if it exists
        if 'Sensor' in df.columns:
            df.drop('Sensor', axis=1, inplace=True)

        data_frames.append(df)

    # Merge all the DataFrames on the 'Date' and 'Time gap' columns
    data = data_frames.pop(0)
    for df in data_frames:
        data = pd.merge(data, df, on=['Date', 'Time gap'], how='outer')

    # Convert the 'Date' column to datetime type
    data["Date"] = pd.to_datetime(data["Date"])
    return data

def send_data(data, sock, delta, pi):
    current_date = pd.Timestamp("2018-12-06")
    end_date = pd.Timestamp("2023-03-31")
    time_period = pd.Timedelta(days=pi)

    # Send data in batches based on time intervals
    while current_date <= end_date:
        next_date = time_period + current_date
        mask = (data["Date"] < next_date) & (data["Date"] >= current_date)
        batch_data = data.loc[mask]

        print(f"Sending data for period: {current_date} - {next_date}")
        print(batch_data.head())

        # Convert the batch data to CSV format and send it over the socket connection
        batch_str = batch_data.to_csv(index=False)
        sock.sendall(batch_str.encode("utf-8"))

        current_date = next_date
        time.sleep(delta)

def main(directory, host, port, delta, pi):
    # Read the data from CSV files in the specified directory
    data = read_data(directory)

    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as sock:
        sock.bind((host, port))
        sock.listen(1)
        print(f"Producer listening on {host}:{port}")

        # Accept a connection from the consumer
        conn, addr = sock.accept()
        print(f"Producer connected to {addr}")

        # Send the data to the consumer in batches
        send_data(data, conn, delta, pi)


if __name__ == "__main__":
    directory = r"/home/vladi/Bureau/BigData/Project_1"
    host = "localhost"
    port = 4545
    delta = 60  # seconds
    pi = 30  # days

    main(directory, host, port, delta, pi)

Producer listening on localhost:4545
Producer connected to ('127.0.0.1', 46592)
Sending data for period: 2018-12-06 00:00:00 - 2019-01-05 00:00:00
        Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
0 2018-12-06         1            0                   -1             0  \
1 2018-12-06         2            0                   -1             0   
2 2018-12-06         3            0                   -1             0   
3 2018-12-06         4            0                   -1             0   
4 2018-12-06         5            0                   -1             0   

   Average_speed_CVT387  Count_CB1699  Average_speed_CB1699  Count_CEK049   
0                    -1             0                    -1             0  \
1                    -1             0                    -1             0   
2                    -1             0                    -1             0   
3                    -1             0                    -1             0   
4                    -1

Sending data for period: 2019-04-05 00:00:00 - 2019-05-05 00:00:00
            Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
11520 2019-04-05         1            0                   -1             0  \
11521 2019-04-05         2            0                   -1             0   
11522 2019-04-05         3            0                   -1             0   
11523 2019-04-05         4            0                   -1             0   
11524 2019-04-05         5            0                   -1             0   

       Average_speed_CVT387  Count_CB1699  Average_speed_CB1699  Count_CEK049   
11520                    -1             0                    -1             2  \
11521                    -1             0                    -1             1   
11522                    -1             0                    -1             1   
11523                    -1             0                    -1             6   
11524                    -1             0                  

Sending data for period: 2019-08-03 00:00:00 - 2019-09-02 00:00:00
            Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
23040 2019-08-03         1            0                   -1             0  \
23041 2019-08-03         2            0                   -1             0   
23042 2019-08-03         3            0                   -1             0   
23043 2019-08-03         4            0                   -1             0   
23044 2019-08-03         5            0                   -1             0   

       Average_speed_CVT387  Count_CB1699  Average_speed_CB1699  Count_CEK049   
23040                    -1             0                    -1             1  \
23041                    -1             0                    -1             8   
23042                    -1             0                    -1             2   
23043                    -1             0                    -1             2   
23044                    -1             0                  

Sending data for period: 2019-12-01 00:00:00 - 2019-12-31 00:00:00
            Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
34560 2019-12-01         1            0                   -1             0  \
34561 2019-12-01         2            0                   -1             0   
34562 2019-12-01         3            0                   -1             0   
34563 2019-12-01         4            0                   -1             0   
34564 2019-12-01         5            0                   -1             0   

       Average_speed_CVT387  Count_CB1699  Average_speed_CB1699  Count_CEK049   
34560                    -1             0                    -1             8  \
34561                    -1             0                    -1             6   
34562                    -1             0                    -1             3   
34563                    -1             0                    -1             2   
34564                    -1             0                  

Sending data for period: 2020-03-30 00:00:00 - 2020-04-29 00:00:00
            Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
46080 2020-03-30         1            0                   -1             0  \
46081 2020-03-30         2            0                   -1             0   
46082 2020-03-30         3            0                   -1             0   
46083 2020-03-30         4            0                   -1             0   
46084 2020-03-30         5            0                   -1             0   

       Average_speed_CVT387  Count_CB1699  Average_speed_CB1699  Count_CEK049   
46080                    -1             0                    -1             0  \
46081                    -1             0                    -1             0   
46082                    -1             0                    -1             0   
46083                    -1             0                    -1             0   
46084                    -1             0                  

Sending data for period: 2020-07-28 00:00:00 - 2020-08-27 00:00:00
            Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
57600 2020-07-28         1            0                   -1             0  \
57601 2020-07-28         2            0                   -1             2   
57602 2020-07-28         3            0                   -1             0   
57603 2020-07-28         4            0                   -1             0   
57604 2020-07-28         5            0                   -1             0   

       Average_speed_CVT387  Count_CB1699  Average_speed_CB1699  Count_CEK049   
57600                    -1             0                    -1             0  \
57601                    21             0                    -1             0   
57602                    -1             0                    -1             1   
57603                    -1             0                    -1             0   
57604                    -1             0                  

Sending data for period: 2020-11-25 00:00:00 - 2020-12-25 00:00:00
            Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
69120 2020-11-25         1            1                   14             0  \
69121 2020-11-25         2            0                   -1             0   
69122 2020-11-25         3            0                   -1             0   
69123 2020-11-25         4            0                   -1             0   
69124 2020-11-25         5            0                   -1             0   

       Average_speed_CVT387  Count_CB1699  Average_speed_CB1699  Count_CEK049   
69120                    -1             0                    -1             0  \
69121                    -1             0                    -1             0   
69122                    -1             1                    21             1   
69123                    -1             1                     8             0   
69124                    -1             0                  

Sending data for period: 2021-03-25 00:00:00 - 2021-04-24 00:00:00
            Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
80640 2021-03-25         1            0                   -1             0  \
80641 2021-03-25         2            0                   -1             0   
80642 2021-03-25         3            0                   -1             0   
80643 2021-03-25         4            0                   -1             0   
80644 2021-03-25         5            0                   -1             0   

       Average_speed_CVT387  Count_CB1699  Average_speed_CB1699  Count_CEK049   
80640                    -1             0                    -1             0  \
80641                    -1             0                    -1             0   
80642                    -1             0                    -1             0   
80643                    -1             0                    -1             0   
80644                    -1             0                  

Sending data for period: 2021-07-23 00:00:00 - 2021-08-22 00:00:00
            Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
92160 2021-07-23         1            0                   -1             0  \
92161 2021-07-23         2            1                   36             0   
92162 2021-07-23         3            0                   -1             0   
92163 2021-07-23         4            0                   -1             1   
92164 2021-07-23         5            2                   34             0   

       Average_speed_CVT387  Count_CB1699  Average_speed_CB1699  Count_CEK049   
92160                    -1             2                    19             7  \
92161                    -1             2                    22             3   
92162                    -1             1                    14             2   
92163                    12             3                    20             2   
92164                    -1             0                  

Sending data for period: 2021-11-20 00:00:00 - 2021-12-20 00:00:00
             Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
103680 2021-11-20         1            2                   15             0  \
103681 2021-11-20         2            2                   21             0   
103682 2021-11-20         3            2                   20             1   
103683 2021-11-20         4            1                   44             0   
103684 2021-11-20         5            0                   -1             1   

        Average_speed_CVT387  Count_CB1699  Average_speed_CB1699   
103680                    -1             5                    19  \
103681                    -1             0                    -1   
103682                    17             3                    19   
103683                    -1             4                    20   
103684                    16             1                    19   

        Count_CEK049  Average_speed_CEK049  ...  

Sending data for period: 2022-03-20 00:00:00 - 2022-04-19 00:00:00
             Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
115200 2022-03-20         1            3                   12             0  \
115201 2022-03-20         2            0                   -1             0   
115202 2022-03-20         3            0                   -1             3   
115203 2022-03-20         4            1                   14             0   
115204 2022-03-20         5            4                   13             0   

        Average_speed_CVT387  Count_CB1699  Average_speed_CB1699   
115200                    -1             2                    38  \
115201                    -1             0                    -1   
115202                    16             1                    18   
115203                    -1             2                    16   
115204                    -1             0                    -1   

        Count_CEK049  Average_speed_CEK049  ...  

Sending data for period: 2022-07-18 00:00:00 - 2022-08-17 00:00:00
             Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
126720 2022-07-18         1            0                   -1             0  \
126721 2022-07-18         2            0                   -1             0   
126722 2022-07-18         3            0                   -1             0   
126723 2022-07-18         4            3                   21             0   
126724 2022-07-18         5            1                   19             0   

        Average_speed_CVT387  Count_CB1699  Average_speed_CB1699   
126720                    -1             2                    16  \
126721                    -1             0                    -1   
126722                    -1             0                    -1   
126723                    -1             0                    -1   
126724                    -1             0                    -1   

        Count_CEK049  Average_speed_CEK049  ...  

Sending data for period: 2022-11-15 00:00:00 - 2022-12-15 00:00:00
             Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
138240 2022-11-15         1            0                   -1             1  \
138241 2022-11-15         2            0                   -1             0   
138242 2022-11-15         3            1                   22             0   
138243 2022-11-15         4            1                   17             0   
138244 2022-11-15         5            0                   -1             1   

        Average_speed_CVT387  Count_CB1699  Average_speed_CB1699   
138240                    22             0                    -1  \
138241                    -1             1                    21   
138242                    -1             0                    -1   
138243                    -1             2                    18   
138244                    20             0                    -1   

        Count_CEK049  Average_speed_CEK049  ...  

Sending data for period: 2023-03-15 00:00:00 - 2023-04-14 00:00:00
             Date  Time gap  Count_CAT17  Average_speed_CAT17  Count_CVT387   
149760 2023-03-15         1            1                   11             0  \
149761 2023-03-15         2            2                   16             0   
149762 2023-03-15         3            0                   -1             0   
149763 2023-03-15         4            0                   -1             1   
149764 2023-03-15         5            0                   -1             0   

        Average_speed_CVT387  Count_CB1699  Average_speed_CB1699   
149760                    -1             0                    -1  \
149761                    -1             0                    -1   
149762                    -1             0                    -1   
149763                    15             0                    -1   
149764                    -1             0                    -1   

        Count_CEK049  Average_speed_CEK049  ...  