Note :- Create a $copy$ of the notebook in your 'Personal Drive' and then run the .ipynb file.

In [None]:
pip install plotly

In [2]:
#import all the necessary libraries
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
from scipy.optimize import curve_fit
import math
import plotly.graph_objs as go

In [None]:
"""
to print the dataframe as a string output :- print(df.to_string())
to convert the pandas datafram to numpy array :- points = df[["Density","Flow"]].to_numpy()
"""

d =  pd.read_csv('/content/Data/Zurich - Sheet1.csv')
df = pd.DataFrame(d, columns=['Density', 'Flow'])
points = df.to_numpy()

df.head()

In [4]:
#The following function splits the numpy ndarray into 2 1 dimensional arrays as the coordinates to the plot
def extract_coordinates(arr):
    return arr[:,0], arr[:,1]

In [None]:
'''
Note that here I have choosen to cut the dataset into two parts at density = 17.5 veh/km.
The only argument to support this choice of density is 
to reduce the std error while fitting the curve onto the dataset.
'''

#Plotting the data
x, y = extract_coordinates(points)

#overall graph
plt.scatter(x, y)
plt.show()

print('\n')

#first fiting line
plt.scatter(x, y)
plt.xlim((0, 17.5))
plt.show()

print('\n')

#second fitting line
plt.scatter(x, y)
plt.xlim((17.5, 100))
plt.show()

This block $splits$ the given data into two $manually$ $choosen$ halves so as to fit two curves

In [6]:
points1 = []
points2 = []
for i in points:
    if i[0] <= 17.5:
        points1.append(i)
    else:
        points2.append(i)
set1 = np.array(points1)
set2 = np.array(points2)

type(set2)
len(set1), len(set2)

(343, 136)

# Fitting a line to both parts of the data set using scipy curve_fit


In [None]:
'''
This is an alternative approach, 
leads to a higher variance for this case
Can be used but not recommended
'''
def fit_set1(x_s1, m, c):
    y = m*x_s1 + c
    return y

#Extracting the coordinates
x_s1, y_s1 = extract_coordinates(set1)

#using the curve fit library to fit the curve to the dataset
parameters_s1, covariance = curve_fit(fit_set1, x_s1, y_s1, p0= [32, 52])
m_s1 = parameters_s1[0]
c_s1 = parameters_s1[1]
print('m = ', m_s1, ' c = ', c_s1)
print(F'The equation is y = {m_s1:.5f}x + {c_s1:.5f}')

#plotting the fit
fit_y = fit_set1(x_s1, m_s1, c_s1)
plt.plot(x_s1, y_s1, 'o', label='data')
plt.plot(x_s1, fit_y, label='fit')
plt.legend()

#evaluating the error
SE = np.sqrt(np.diag(covariance))
SE_A = SE[0]
SE_B = SE[1]
print(F'The value of A is {m_s1:.5f} with standard error of {SE_A:.5f}.')
print(F'The value of B is {c_s1:.5f} with standard error of {SE_B:.5f}.')

y_predicted = fit_set1(x_s1, m_s1, c_s1)
var_cf1 = np.sum(np.square(y_s1 - y_predicted))
print("The variance observed in this curve is: ", var_cf1)

In [None]:
def fit_set2(x_s2, m, c):
    y_s2 = m*x_s2 + c
    return y_s2

#Extracting the coordinates
x_s2, y_s2 = extract_coordinates(set2)

#using the curve fit library to fit the curve to the dataset
parameters_s2, covariance = curve_fit(fit_set2, x_s2, y_s2)
m_s2 = parameters_s2[0]
c_s2 = parameters_s2[1]
print('m = ', m_s2, ' c = ', c_s2)
print(F'The equation is y = {m_s2:.5f}x**p + {c_s2:.5f}')

#plotting the fit
fit_y = fit_set2(x_s2, m_s2, c_s2)
plt.plot(x_s2, y_s2, 'o', label='data')
plt.plot(x_s2, fit_y, label='fit')
plt.legend()

#evaluating the error
SE = np.sqrt(np.diag(covariance))
SE_A = SE[0]
SE_B = SE[1]
print(F'The value of A is {m_s2:.5f} with standard error of {SE_A:.5f}.')
print(F'The value of B is {c_s2:.5f} with standard error of {SE_B:.5f}.')

y_predicted2 = fit_set2(x_s2, m_s2, c_s2)
var_cf2 = np.sum(np.square(y_s2 - y_predicted2))
print("The variance observed in this curve is: ", var_cf2)

# Fitting the data set using numpy polyfit :- $degree$ $2$ $polynomial$

In [None]:
#The ployfit library in python is used to fit a degree 2 polynomial to the given dataset
#The fit looks better that curve_fit and has lesser variance than earlier fir

xp, yp = extract_coordinates(set1)
trend = np.polyfit(xp, yp, 2)
print("The Parameters of the quadratic are :- ", trend)
plt.plot(xp, yp,'o', label='data')
trendpoly = np.poly1d(trend)
plt.plot(xp, trendpoly(xp), label='trend')
plt.legend()

def f_set1(xp):
    y = trend[0]*(xp**2) + trend[1]*xp + trend[2]
    return y
y_predicted = f_set1(xp)
variance1 = np.sum(np.square(yp - y_predicted))
print("The variance observed in this curve is: ", variance1)

In [None]:
xp2, yp2 = extract_coordinates(set2)
trend2 = np.polyfit(xp2, yp2, 2)
print("The Parameters of the quadratic are :- ", trend2)
plt.plot(xp2, yp2,'o', label='data')
trendpoly = np.poly1d(trend2)
plt.plot(xp2, trendpoly(xp2), label='trend2')
plt.legend()

def f_set2(xp2):
    y = trend2[0]*(xp2**2) + trend2[1]*xp2 + trend2[2]
    return y
y_predicted2 = f_set2(xp2)
variance2 = np.sum(np.square(yp2 - y_predicted2))
print("The variance observed in this curve is: ", variance2)

Using $curve$_$fit$ and $polyfit$ we can see that the variance is **Lesser** when we try and implement $POLYFIT$, 
so I have implemented the polyfit equations in the lax-friedrichs scheme

Applying $LAX$ $FRIEDRICHS$ $SCHEME$ to the space time diagram to simulate the traffic propagation on the road length

In [11]:
'''
initially we need to define the initial conditions and the boundary conditions
Here I am rewriting the cell st_curve[0][0] to have 0 density assuming the boundary condition to be true
This rewriting is done due to ambiguity that there are 2 values of density assigned to the same cell.
'''
st_curve = np.zeros(shape=(200,1500), dtype=float, order='C')
# depending upon the initial condition the FIRST COLUMN in the st_curve matrix is filled
for i in range(200):
    if i > 100:
        st_curve[i][0] = 50
    elif i <= 100 and i >= 50:
        st_curve[i][0] = 350
    else:
        i_y = 7*i
        st_curve[i][0] = i_y

# depending upon the boundary condition the LAST ROW in the st_curve matrix is filled
for i in range(1500):
    if i > 333:
        st_curve[199][i] = 50
    elif i > 166 and i < 333:
        st_curve[199][i] = 75
    else:
        st_curve[199][i] = 0

In [12]:
'''
After that we apply the formula over the space-time plot to calculate the propogation of the traffic throughout space-time
here depending upon the density at each location if it is more than 17.5 veh/km we call the second function that we fit earlier
else we fit the first function.
By first and second function I mean, the first fitted part and the second fitted part of the given dataset
'''
val1 = 0
val2 = 0

for i in range(1, 1500):
    for j in range(198, -1, -1):
        if st_curve[j+1][i-1] > 17.5:
            val1 = f_set2(st_curve[j-1][i-1])
        elif st_curve[j+1][i-1] <= 17.5:
            val1 = f_set1(st_curve[j-1][i-1])
        
        if st_curve[j-1][i-1] > 17.5:
            val2 = f_set2(st_curve[j+1][i-1])
        elif st_curve[j-1][i-1] <= 17.5:
            val2 = f_set1(st_curve[j+1][i-1])

        term1 = ((st_curve[j+1][i-1] + st_curve[j-1][i-1])/2)

        term2 = (val1 - val2)

        st_curve[j][i] = term1 - 0.005*term2

In [13]:
#This is convert a matrix to csv file to manually observe the propagation of traffic
import csv

with open('lax_friedrichs.csv','w') as f:
    writer = csv.writer(f)
    writer.writerows(st_curve)

In [None]:
# Generate x, y, and z coordinates
x = np.arange(st_curve.shape[1])  # space dimension
y = np.arange(st_curve.shape[0])  # time dimension
X, Y = np.meshgrid(x, y)
Z = st_curve

# Create trace
trace = go.Surface(
    x=X,
    y=Y,
    z=Z,
)

# Create layout
layout = go.Layout(
    scene=dict(
        xaxis_title='Time',
        yaxis_title='Space',
        zaxis_title='Density',
    ),
)

# Create figure
fig = go.Figure(data=[trace], layout=layout)

# Show figure
fig.show()

In [None]:
# assume you have the following variables defined:
# st_curve: the space-time density matrix
# time_index: the index of the desired time instance

#store the required time instances in an array
time_inst = [165, 499, 999, 1499]

for i in time_inst:
    # extract the density values at the desired time instance
    density_curve = st_curve[:, i]

    # create a scatter plot of the density curve
    fig = go.Figure(data=[go.Scatter(x=np.arange(len(density_curve)), y=density_curve)])

    # customize the plot layout
    fig.update_layout(
        title=f"Density Variation with Space at Time = {i}",
        xaxis_title="Space",
        yaxis_title="Density",
    )

    # show the plot
    fig.show()

    print('\n')