# Homework 2

**Name:** Oscar Beltran Villegas

**e-mail:** oscar.beltran7944@alumnos.udg.mx

# MODULES

In [2]:

import math
import pandas as pd
import numpy as np
from scipy.stats import cauchy, levy_stable
from scipy.spatial import distance
import plotly.graph_objects as go

# CLASSES

In [3]:
################# http://www.pygame.org/wiki/2DVectorClass ##################
class Vec2d(object):
    """2d vector class, supports vector and scalar operators,
       and also provides a bunch of high level functions
       """
    __slots__ = ['x', 'y']

    def __init__(self, x_or_pair, y=None):
        if y is None:
            self.x = x_or_pair[0]
            self.y = x_or_pair[1]
        else:
            self.x = x_or_pair
            self.y = y

    # Addition
    def __add__(self, other):
        if isinstance(other, Vec2d):
            return Vec2d(self.x + other.x, self.y + other.y)
        elif hasattr(other, "__getitem__"):
            return Vec2d(self.x + other[0], self.y + other[1])
        else:
            return Vec2d(self.x + other, self.y + other)

    # Subtraction
    def __sub__(self, other):
        if isinstance(other, Vec2d):
            return Vec2d(self.x - other.x, self.y - other.y)
        elif hasattr(other, "__getitem__"):
            return Vec2d(self.x - other[0], self.y - other[1])
        else:
            return Vec2d(self.x - other, self.y - other)

    # Vector length
    def get_length(self):
        return math.sqrt(self.x**2 + self.y**2)

    # Rotate vector
    def rotated(self, angle):
        cos_theta = math.cos(angle)
        sin_theta = math.sin(angle)
        x = self.x * cos_theta - self.y * sin_theta
        y = self.x * sin_theta + self.y * cos_theta
        return Vec2d(x, y)

**Activity 1:** Path length - (BM1 vs BM2 vs CRW) (4 pts)
- Write a function that returns a **Brownian Motion (BM)** trajectory in **pandas** df.
- Write a function that returns a **Correlated Random Walk (CRW)** trajectory in **pandas** df.
- Write a function that returns the **path length** for a given trajectory.
- Compare at least the **path length** of **three** trajectories as shown in the figure below.
- Display the results using **plotly**.

**FUNCTIONS**

In [7]:
#####################################################################################
# Brownian motion trajectoy
#####################################################################################
def bm_2d(n_steps=1000, speed=5, s_pos=[0,0]):
    """
    Arguments:
        n_steps: Number of steps
        speed: Speed of the particle
        s_pos: Initial position of the particle
    Returns:
        BM_2d_df: DataFrame with x and y positions of the particle in pandas DataFrame
    """
    # Init velocity vector
    velocity =Vec2d(speed,0)
    
    # Init DF
    BM_2d_df = pd.DataFrame(columns=['x_pos','y_pos'])    
    # Add initial position
    temp_df = pd.DataFrame([{'x_pos':s_pos[0], 'y_pos':s_pos[1]}])    
    BM_2d_df = pd.concat([BM_2d_df, temp_df], ignore_index=True)
    
    # Generate the trajectory
    for i in range(n_steps-1):        
        turn_angle = np.random.uniform(low=-np.pi, high=np.pi)
        velocity = velocity.rotated(turn_angle)
    
        temp_df = pd.DataFrame([{'x_pos':BM_2d_df.x_pos[i]+velocity.x, 'y_pos':BM_2d_df.y_pos[i]+velocity.y}])    
        BM_2d_df = pd.concat([BM_2d_df, temp_df], ignore_index=True)
        
    return BM_2d_df


#####################################################################################
# Correlated random walk trajectory
#####################################################################################
def crw_2d(n_steps=1000, speed=5, coefficient=0.9, s_pos=[0,0]):
    """
    Arguments:
        n_steps: Number of steps
        speed: Speed of the particle
        coefficient: Cauchy distribution coefficient
        s_pos: Initial position of the particle
    Returns:
        CRW_2d_df: DataFrame with x and y positions of the particle in pandas DataFrame
    """
    # Init velocity vector
    velocity = Vec2d(speed, 0)

    # Init DF
    CRW_2d_df = pd.DataFrame(columns=['x_pos', 'y_pos'])
    # Add initial position
    temp_df = pd.DataFrame([{'x_pos': s_pos[0], 'y_pos': s_pos[1]}])
    CRW_2d_df = pd.concat([CRW_2d_df, temp_df], ignore_index=True)
    
    # Generate the trajectory
    for i in range(n_steps-1):
        turn_angle = cauchy.rvs(scale=coefficient)
        velocity = velocity.rotated(turn_angle)
    
        temp_df = pd.DataFrame([{'x_pos': CRW_2d_df.x_pos[i] + velocity.x, 'y_pos': CRW_2d_df.y_pos[i] + velocity.y}])
        CRW_2d_df = pd.concat([CRW_2d_df, temp_df], ignore_index=True)
    
    return CRW_2d_df

#####################################################################################
# Path length for a given trajectory.
#####################################################################################
def path_length_trajectory(df):
    """
    Arguments:
        df: DataFrame with x and y positions of the particle in pandas DataFrame
    Returns:
        pl: DataFrame of path length
    """
    # Init DF
    pl = pd.DataFrame({'y_pos': [0]})

    # calculate the distance between each point
    distances = [distance.euclidean(df.iloc[i], df.iloc[i + 1]) for i in range(len(df) - 1)]
    pl = pd.DataFrame({'y_pos': np.insert(np.cumsum(distances), 0, 0)})
        
    return pl


**Compare the path length of the three trajectories Display the results using plotly.**

In [10]:
# Generate trajectories
n_steps = 1000 # Number of steps
bm_traj_3 = bm_2d(n_steps, speed=3) # Brownian motion with speed 3
bm_traj_6 = bm_2d(n_steps, speed=6) # Brownian motion with speed 6
crw_traj_6 = crw_2d(n_steps, speed=6) # Correlated random walk with speed 6

bm_pl_3 = path_length_trajectory(bm_traj_3) # Path length for BM 3
bm_pl_6 = path_length_trajectory(bm_traj_6) # Path length for BM 6
crw_pl_6 = path_length_trajectory(crw_traj_6) # Path length for CRW 6

fig = go.Figure()


# Add traces for the trajectories
fig.add_trace(go.Scatter(x=bm_pl_3.index, y=bm_pl_3.y_pos, mode='lines', name='path length BM 3'))
fig.add_trace(go.Scatter(x=bm_pl_6.index, y=bm_pl_6.y_pos, mode='lines', name='path length BM 6', line=dict(width=8)))
fig.add_trace(go.Scatter(x=crw_pl_6.index, y=crw_pl_6.y_pos, mode='lines', name='path length CRW 6'))

fig.show()



**Activity 2**: Mean Squared Displacement - (BM vs CRW) (4 pts)
- Write a function that returns the **mean squared displacement** for a given trajectory.
- Compare the **mean squared displacement** curves of at least two trajectories of
different kinds, as shown in the figure below.
- Display the results using **plotly**.

**FUNCTIONS**

In [6]:
#####################################################################################
# Mean squared displacement for a given trajectory.
#####################################################################################
def msd(df_traj):
    """
    Arguments:
        df_traj: DataFrame with x and y positions of the trajectory in pandas DataFrame
    Returns:
        df_msd: DataFrame with mean squared displacement
    """
    # Convert DataFrame to NumPy array (for faster calculations)
    traj = df_traj[['x_pos', 'y_pos']].to_numpy().astype(float)
    
    # Init MSD array with zeros wich will be filled with values
    msd_values = np.zeros(len(traj))
    
    
    for n in range(1, len(traj)):
        displacements = traj[n:] - traj[:-n] # Calculate the displacements for each time lag
        squared_displacements = np.sum(displacements**2, axis=1) # Calculate the squared displacements
        msd_values[n] = np.mean(squared_displacements) # Calculate the mean squared displacement
    
    # Convert the result back to pandas DataFrame
    df_msd = pd.DataFrame({'y_pos': msd_values})
    
    return df_msd

Comparison of mean square displacement curves of BM 3, BM 6 and CRW 6. And results shown using plotly.

In [11]:
bm3_msd_df = msd(bm_traj_3) # MSD for BM 3
bm6_msd_df = msd(bm_traj_6) # MSD for BM 6
crw_msd_df = msd(crw_traj_6) # MSD for CRW 6, 0.9 coefficient per default

fig = go.Figure()

# Add traces for MSD of the Brownian motion 3 trajectory
fig.add_trace(go.Scatter(
    x= bm3_msd_df.index,
    y= bm3_msd_df.y_pos,
    name='MSD BM 3',
    mode='lines',
    showlegend=True
))

# Add traces for MSD of the Brownian motion 6 trajectory
fig.add_trace(go.Scatter(
    x= bm6_msd_df.index,
    y= bm6_msd_df.y_pos,
    name='MSD BM 6',
    mode='lines',
    showlegend=True
))

# Add traces for MSD of the Correlated Random Walk 6 trajectory
fig.add_trace(go.Scatter(
    x= crw_msd_df.index,
    y= crw_msd_df.y_pos,
    name='MSD CRW 6 c=0.9',
    mode='lines',
    showlegend=True
))

fig.show()

**Activity 3**: Turning-angle Distribution - (source dist. vs observed dist.) (6 pts)
- Consider two **CRW** trajectories with different **Cauchy coefficients**.
- Write a function that returns the **turning angles** for a given trajectory.
- Compare the observed distribution (histogram) to the source distribution (**curve**) for
both trajectories, as shown in the figure below.
- Display the results using **plotly**.

In [12]:
def calculate_turning_angles(trajectory_df):
    """
    Calculate turning angles from a trajectory.
    
    Arguments:
        trajectory_df: DataFrame with x_pos and y_pos columns
    Returns:
        turning_angles: List of turning angles in radians
    """
    # Extract positions
    x = trajectory_df['x_pos'].values
    y = trajectory_df['y_pos'].values
    
    # Calculate vectors between consecutive positions
    vectors = []
    for i in range(len(x)-1):
        dx = x[i+1] - x[i]
        dy = y[i+1] - y[i]
        vectors.append([dx, dy])
    
    # Calculate turning angles using the dot product and cross product formulas
    turning_angles = []
    for i in range(len(vectors)-1):
        a = vectors[i]
        b = vectors[i+1]
        
        # Calculate dot product
        dot_product = a[0]*b[0] + a[1]*b[1]
        
        # Calculate magnitudes
        mag_a = np.sqrt(a[0]**2 + a[1]**2)
        mag_b = np.sqrt(b[0]**2 + b[1]**2)
        
        # Calculate cross product (for direction)
        cross_product = a[0]*b[1] - a[1]*b[0]
        
        # Calculate angle using arccos of dot product formula
        angle = np.arccos(np.clip(dot_product / (mag_a * mag_b), -1.0, 1.0))
        
        # Determine the sign of the angle using cross product
        if cross_product < 0:
            angle = -angle
            
        turning_angles.append(angle) # Append the angle to the list
    
    return turning_angles

In [14]:

#####################################################################################
# Function to compare turning angle distributions for multiple CRW trajectories
#####################################################################################
def compare_turning_angle_distributions(coefficients=[0.6, 0.9], n_steps=5000):
    """
    Compare turning angle distributions for multiple CRW trajectories with different Cauchy coefficients.
    
    Arguments:
        coefficients: List of Cauchy coefficients to compare
        
    Returns:
        fig: Plotly figure object with the comparison
    """
    
    # Define colors for the plot (observed and Cauchy)
    colors = {
        'observed': [
            'rgba(100, 149, 237, 0.6)',  # Blue
            'rgba(255, 99, 71, 0.6)',    # Red
            'rgba(60, 179, 113, 0.6)',   # Green
            'rgba(255, 165, 0, 0.6)',    # Orange
            'rgba(186, 85, 211, 0.6)'    # Purple
        ],
        'cauchy': [
            'blue',
            'red',
            'green',
            'orange',
            'purple'
        ]
    }
    
    fig = go.Figure() # Create a new figure
    
    # Process each coefficient
    for i, coef in enumerate(coefficients):
        # Generate trajectory for each coefficient
        trajectory = crw_2d(n_steps=n_steps, speed=1, coefficient=coef)
        
        # Calculate turning angles
        turning_angles = calculate_turning_angles(trajectory)
        
        # Create histogram of turning angles
        hist = np.histogram(turning_angles, bins=100, range=(-3, 3), density=True)
        
        # Add histogram
        fig.add_trace(
            go.Bar(
            x=[(hist[1][i] + hist[1][i+1])/2 for i in range(len(hist[1])-1)],
            y=hist[0],
            name=f'observed_{coef}',
            marker_color=colors['observed'][i % len(colors['observed'])],
            width=(hist[1][1] - hist[1][0]) * 0.5,  # Make the bars thinner
            )
        )
        
        # Generate x values for the Cauchy distribution
        x = np.linspace(-3, 3, 5000)
        
        # Add Cauchy distribution curve
        y_cauchy = cauchy.pdf(x, scale=coef)
        fig.add_trace(
            go.Scatter(
                x=x,
                y=y_cauchy,
                mode='lines',
                name=f'cauchy_{coef}',
                line=dict(color=colors['cauchy'][i % len(colors['cauchy'])], width=2),
            )
        )
    
    # ajust the layout of the plot
    fig.update_xaxes(range=[-3, 3])
    fig.update_yaxes(range=[0, 3.5])
    
    return fig

In [15]:
# Compare turning angle distributions for different Cauchy coefficients
fig = compare_turning_angle_distributions(coefficients=[0.6, 0.1, 0.9], n_steps=5000)
fig.show()

**Activity 4**: Step-length Distribution - (source dist. vs observed dist.) (6 pts)
- Write a function that returns a **Lévy Walk (LW)** trajectory in **pandas** df.
- Consider two **LW** trajectories with different **alpha coefficients**.
- Write a function that restaurants the **step lengths** for a given trajectory.
- Compare the observed distribution (**histogram**) to the source distribution (**curve**) for
both trajectories, as shown in the figure below.
- Display the results using **plotly**.

**FUNCTIONS**

In [16]:
#####################################################################################
# Generate a Lévy Walk trajectory
#####################################################################################
def generate_levy_walk(n_steps=1000, alpha=1.0, beta=1.0):
    """
    Generate a Lévy Walk trajectory.
    
    Arguments
    n_steps : Number of steps in the trajectory
    alpha : Lévy distribution parameter controlling the tail heaviness (0 < alpha <= 2)
    beta : Lévy distribution parameter controlling the skewness (-1 <= beta <= 1)
    
    Returns:
    df : DataFrame containing the trajectory coordinates (x, y) and step information
    """

    
    # Generate step lengths from Lévy stable distribution
    step_lengths = levy_stable.rvs(alpha=alpha, beta=beta, size=n_steps)
    step_lengths = np.abs(step_lengths)  # Ensure positive step lengths
    
    # Generate random angles
    angles = np.random.uniform(0, 2*np.pi, n_steps)
    velocity = Vec2d(5, 0) 

    # Generate trajectory from step lengths and angles
    trajectory = np.zeros((n_steps, 3))

    for i in range(1, n_steps):

        # Update velocity vector
        velocity = velocity.rotated(angles[i])

        # Calculate step vector
        trajectory[i, 0] = velocity.x * step_lengths[i]
        trajectory[i, 1] = velocity.y * step_lengths[i]
        trajectory[i, 2] = step_lengths[i]
    
    # Create pandas DataFrame from trajectory
    df = pd.DataFrame({
        'x': trajectory[:, 0], 
        'y': trajectory[:, 1], 
        'step_length': trajectory[:, 2], 
    })
    
    return df

#####################################################################################
# Calculate step lengths from a trajectory DataFrame
#####################################################################################
def calculate_step_lengths(trajectory_df):
    """
    Calculate step lengths from a trajectory DataFrame.
    
    Arguments:
    trajectory_df : DataFrame containing the trajectory coordinates (x, y) and step information

    Returns:
    step_lengths : Numpy array containing the step lengths
    """
    # Get coordinates
    x = trajectory_df['x'].values
    y = trajectory_df['y'].values
    
    # Calculate step vectors
    dx = np.diff(x)
    dy = np.diff(y)
    
    # Calculate step lengths
    step_lengths = np.sqrt(dx*2 + dy*2)
    
    return step_lengths

#####################################################################################
# Plot Lévy Walk trajectory and step length distribution
#####################################################################################
def levy_pdf(x, alpha, beta=1.0):
    """
    Calculate the probability density function of a Lévy stable distribution.
    This is an approximation for visualization purposes.
    
    Arguments:
    x : Points at which to evaluate the PDF
    alpha : Lévy distribution parameter
    beta : Lévy distribution parameter
    
    Returns:
        pdf_values : PDF values at the given points
    """
    return levy_stable.pdf(x, alpha=alpha, beta=beta)

#####################################################################################
# Plot Lévy Walk trajectory and step length distribution
#####################################################################################
def plot_levy_walk_comparison(n_steps=1000, alphas=[1.0, 0.7], beta=1.0):
    """
    Generate and plot Lévy Walk trajectories with different alpha values,
    comparing observed step length distributions to source distributions.
    
    Arguments:
    n_steps : Number of steps in each trajectory
    alphas : List of alpha values to compare
    beta : Beta parameter for Lévy stable distribution
    
    Returns:
    plotly.graph_objects.Figure
        Figure containing the comparison plot
    """
    # Define colors
    colors = ['purple', 'red']
    
    # Generate trajectories and calculate step lengths
    trajectories = []
    step_lengths = []
    
    # Generate trajectories for each alpha value
    for alpha in alphas:
        traj = generate_levy_walk(n_steps=n_steps, alpha=alpha, beta=beta) # Generate trajectory
        steps = traj['step_length'].values[1:]  # Skip the first (0) value
        
        trajectories.append(traj)
        step_lengths.append(steps)
    
    # Create figure
    fig = go.Figure()
    
    # Set x-range for PDF curve
    x_range = np.linspace(0, 50, 5000)
    
    # Add histograms and PDF curves for each alpha value
    for i, (alpha, steps) in enumerate(zip(alphas, step_lengths)):
        # Calculate histogram values
        hist_values, bin_edges = np.histogram(steps, bins=50, range=(0, 50), density=True)
        bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
        
        # Add histogram
        fig.add_trace(
            go.Bar(
                x=bin_centers,
                y=hist_values,
                name=f'Observed_alpha={alpha}_beta={beta}',
                marker_color=colors[i],
                opacity=0.5,
                width=(bin_edges[1] - bin_edges[0]) * 0.5
            )
        )
        
        # Calculate PDF values
        pdf_values = levy_pdf(x_range, alpha=alpha, beta=beta)
        
        # Add PDF curve
        fig.add_trace(
            go.Scatter(
                x=x_range,
                y=pdf_values,
                name=f'Levy_alpha={alpha}',
                line=dict(color=colors[i])
                
            )
        )
    
    # Set axis ranges
    fig.update_xaxes(range=[0, 50])
    fig.update_yaxes(range=[0, 0.35])
    
    return fig

In [17]:
# Execute the code and show results
fig = plot_levy_walk_comparison(n_steps=5000, alphas=[1.0, 0.7], beta=1.0)
fig.show()