# Homework 3

**Name:** Omar Alejandro Guzmán Munguía

**e-mail:** omar.guzman5063@alumnos.udg.mx

# MODULES

In [251]:
# Load modules
import numpy as np
import math

import plotly.graph_objects as go

from scipy.stats import exponweib
from scipy.stats import levy_stable
from scipy.spatial import distance

import pandas as pd

# CLASSES

In [252]:
################# http://www.pygame.org/wiki/2DVectorClass ##################
class Vec2d(object):
    """2d vector class, supports vector and scalar operators,
       and also provides a bunch of high level functions
       """
    __slots__ = ['x', 'y']

    def __init__(self, x_or_pair, y = None):
        if y == None:            
            self.x = x_or_pair[0]
            self.y = x_or_pair[1]
        else:
            self.x = x_or_pair
            self.y = y
            
    # Addition
    def __add__(self, other):
        if isinstance(other, Vec2d):
            return Vec2d(self.x + other.x, self.y + other.y)
        elif hasattr(other, "__getitem__"):
            return Vec2d(self.x + other[0], self.y + other[1])
        else:
            return Vec2d(self.x + other, self.y + other)

    # Subtraction
    def __sub__(self, other):
        if isinstance(other, Vec2d):
            return Vec2d(self.x - other.x, self.y - other.y)
        elif (hasattr(other, "__getitem__")):
            return Vec2d(self.x - other[0], self.y - other[1])
        else:
            return Vec2d(self.x - other, self.y - other)
    
    # Vector length
    def get_length(self):
        return math.sqrt(self.x**2 + self.y**2)
    
    # rotate vector
    def rotated(self, angle):        
        cos = math.cos(angle)
        sin = math.sin(angle)
        x = self.x*cos - self.y*sin
        y = self.x*sin + self.y*cos
        return Vec2d(x, y)

# Activity 1: Path length - (BM1 vs BM2 vs CRW)

- Write a function that returns a **Brownian Motion (BM)** trajectory in **pandas** df.
- Write a function that returns a **Correlated Random Walk (CRW)** trajectory in **pandas** df.
- Write a function that returns the **path length** for a given trajectory.
- **Compare** at least the **path lengt** of **three** trajectories as shown in the figure below.
- Display the results using **plotly.**

# FUNCTIONS

In [253]:
#####################################################################################
# Brownian motion trajectoy
#####################################################################################
def bm_2d(n_steps=1000, speed=5, s_pos=[0,0]):
    """
    Arguments
    -----------
        n_steps: Number of steps in the trajectory.
        speed: Speed of the trajectory.
        s_pos: Starting position of the trajectory.

    Returns
    -----------
        BM_2d_df:  DataFrame with the x and y positions of the trajectory.
    """
    
    # Init velocity vector
    velocity =Vec2d(speed,0)
    
    # Init DF
    BM_2d_df = pd.DataFrame(columns=['x_pos','y_pos'])    
    # Add initial position
    temp_df = pd.DataFrame([{'x_pos':s_pos[0], 'y_pos':s_pos[1]}])    
    BM_2d_df = pd.concat([BM_2d_df, temp_df], ignore_index=True)
    
    # Generate the trajectory
    for i in range(n_steps-1):        
        turn_angle = np.random.uniform(low=-np.pi, high=np.pi)
        velocity = velocity.rotated(turn_angle)
    
        temp_df = pd.DataFrame([{'x_pos':BM_2d_df.x_pos[i]+velocity.x, 'y_pos':BM_2d_df.y_pos[i]+velocity.y}])    
        BM_2d_df = pd.concat([BM_2d_df, temp_df], ignore_index=True)
        
    return BM_2d_df

In [254]:
#####################################################################################
# Correlated Random Walk (CRW)
#####################################################################################
def crw_trajectory(n_steps=1000, speed=5, correlation_coeff=0.9, s_pos=0.0):
    """
    Arguments
    -----------
        n_steps: Number of steps in the trajectory.
        speed: Length of each step.
        correlation_coeff: Correlation coefficient (between -1 and 1) that controls the correlation between consecutive steps.
        s_pos: Initial direction of movement in radians (default is 0.0).

    Returns
    -----------
        crw_df: A pandas DataFrame with columns ['x', 'y'] representing the trajectory.
    """
    # Initialize the trajectory with the starting point at the origin
    trajectory = [Vec2d(0.0, 0.0)]
    
    # Initialize the current direction
    current_direction = s_pos
    
    for _ in range(n_steps):
        # Generate a random angle change based on the correlation coefficient
        angle_change = np.random.normal(0, np.sqrt(1 - correlation_coeff**2))
        
        # Update the current direction
        current_direction += angle_change
        
        # Calculate the next step vector
        next_step = Vec2d(speed, 0).rotated(current_direction)
        
        # Update the trajectory
        next_position = trajectory[-1] + next_step
        trajectory.append(next_position)
    
    # Convert the trajectory to a pandas DataFrame
    crw_df = pd.DataFrame([(pos.x, pos.y) for pos in trajectory], columns=['x', 'y'])
    
    
    return crw_df

In [255]:
#####################################################################################
# Path length
#####################################################################################
def path_length(trajectory_df, s_pos=[0]):
    """
    Arguments
    -----------
        trajectory_df: DataFrame with the x and y positions of the trajectory.
        s_pos: Starting position of the trajectory.

    Returns
    -----------
        path_length: Length of the trajectory.
    """
    # Initialize the path length
    path_length = pd.DataFrame(columns=['y_pos'])
    df = pd.DataFrame([{'y_pos':s_pos[0]}])

    path_length = pd.concat([path_length, df], ignore_index=True)

    for i in range(len(trajectory_df)-1):
        df = pd.DataFrame([{'y_pos':path_length.y_pos[i] + distance.euclidean(trajectory_df.iloc[i], trajectory_df.iloc[i+1])}])
        path_length = pd.concat([path_length, df], ignore_index=True)
        
    return path_length

# GENERATE TRAJECTORIES AND CALCULATE PATH LENGTH

In [256]:
# Generate trajectories
bm_traj_3 = bm_2d(speed=3)
bm_traj_6 = bm_2d(speed=6)
crw_traj_6 = crw_trajectory(speed=6)

# Calculate path lengths
bm_pl_3 = path_length(bm_traj_3)
bm_pl_6 = path_length(bm_traj_6)
crw_pl_6 = path_length(crw_traj_6)

# PLOTING

In [257]:
fig = go.Figure()

fig.add_trace(go.Scatter(
    x= bm_pl_3.index,
    y= bm_pl_3.y_pos,
    name='Path length - BM 3',
    mode='lines',
    showlegend=True
))

fig.add_trace(go.Scatter(
    x= bm_pl_6.index,
    y= bm_pl_6.y_pos,
    name='Path length - BM 6',
    marker = dict(size=2),
    line = dict(width=6),
    mode='lines',
    showlegend=True
))

fig.add_trace(go.Scatter(
    x= crw_pl_6.index,
    y= crw_pl_6.y_pos,
    name='Path length - CRW 6',
    mode='lines',
    showlegend=True
))

fig.update_layout(
    title='Path Lengths',
    height=800,
)

fig.show()

# Activity 2: Mean Squared Displacement - (BM vs CRW)

- Write a function that returns the **mean squared displacement** for a given trajectory.
- Compare the **mean squared displacement** curves of at least two trajectories of
different kinds, as shown in the figure below.
- Display the results using **plotly.**

# FUNCTIONS

In [258]:
#####################################################################################
# Mean Squared Displacement (MSD)
#####################################################################################
def msd(trajectory_df):
    """    
    Arguments
    -----------
        trajectory_df: DataFrame with columns 
    
    Returns
    -----------
        msd_df: DataFrame with columns ['tau', 'msd'] (time lag vs MSD).
    """
    x_col = 'x' if 'x' in trajectory_df.columns else 'x_pos'
    y_col = 'y' if 'y' in trajectory_df.columns else 'y_pos'
    
    # Extract coordinates
    x = trajectory_df[x_col].values
    y = trajectory_df[y_col].values
    n = len(x)
    
    msd_values = np.zeros(n - 1)  # Store MSD values
    taus = np.arange(1, n)  # All possible time lags
    
    for tau in taus:
        displacements = (x[tau:] - x[:-tau])**2 + (y[tau:] - y[:-tau])**2
        msd_values[tau - 1] = np.mean(displacements)  # Store MSD value for this tau
    
    return pd.DataFrame({'tau': taus, 'msd': msd_values})  

# CALCULATE MSD AND PLOTING

In [274]:
# Calculate MSDs
msd_bm = msd(bm_traj_6)
msd_crw = msd(crw_traj_6)

fig = go.Figure()
fig.add_trace(go.Scatter(x=msd_bm['tau'], y=msd_bm['msd'], mode='lines', name='MSD BM6'))
fig.add_trace(go.Scatter(x=msd_crw['tau'], y=msd_crw['msd'], mode='lines', name='MSD CRW6 c=0.9'))

fig.update_layout(
    title='Mean Squared Displacement - (BM vs CRW)',
    xaxis_title='Time',
    yaxis_title='MSD',
    hovermode='x unified',
    height=600
)
fig.show()

# Activity 3: Turning-angle Distribution - (source dist. vs observed dist.)

- Consider two **CRW** trajectories with different **Cauchy coefficients.**
- Write a function that returns the **turning angles** for a given trajectory.
- Compare the observed distribution **(histogram)** to the source distribution (**curve**) for
both trajectories, as shown in the figure below.
- Display the results using **plotly.**

# FUNCTIONS

In [260]:
#####################################################################################
# Turning angles
#####################################################################################
def turning_angles(trajectory_df):
    """    
    Arguments
    -----------
        trajectory_df: DataFrame with 'x' and 'y' columns
    
    Returns
    -----------
        numpy: Array of turning angles
    """
    x = trajectory_df['x'].values
    y = trajectory_df['y'].values
    
    # Calculate directions (angles) between consecutive points
    dx = np.diff(x)
    dy = np.diff(y)
    directions = np.arctan2(dy, dx)
    
    # Calculate turning angles (differences between consecutive directions)
    turning_angles = np.diff(directions)
    
    # Normalize angles to be between -π and π
    turning_angles = np.mod(turning_angles + np.pi, 2 * np.pi) - np.pi
    
    return turning_angles

In [261]:
#####################################################################################
# Cauchy distribution
#####################################################################################
def cauchy_pdf(x, gamma):
    """
    Arguments
    -----------
        x: Points at which to evaluate the PDF
        gamma: Scale parameter
    
    Returns
    -----------
        PDF values
    """
    return 1 / (np.pi * gamma * (1 + (x / gamma)**2))

In [None]:
#####################################################################################
# Plotting turning angle distributions
#####################################################################################
def plot_turning_angle_distributions(trajectories, coefficients, bins=50):
    """    
    Arguments
    -----------
        trajectories: List of trajectory DataFrames
        coefficients: List of correlation coefficients used to generate the trajectories
        bins: Number of bins for the histograms
    
    Returns
    -----------
        plotly.graph_objects.Figure: Figure with the distributions
    """
    colors = ['blue', 'red']
    
    # Create a figure
    fig = go.Figure()
    
    # Range for the x-axis
    x_range = np.linspace(-3, 3, 1000)
    
    for i, (trajectory, coef) in enumerate(zip(trajectories, coefficients)):
        # Extract turning angles
        t_angles = turning_angles(trajectory)
        
        # Compute histogram values for the observed distribution
        hist_values, bin_edges = np.histogram(t_angles, bins=bins*3, density=True, range=(-3, 3)) 
        bin_centers = (bin_edges[:-1] + bin_edges[1:]) / 2
        
        # Plot observed distribution (histogram)
        fig.add_trace(
            go.Bar(
                x=bin_centers,
                y=hist_values,
                name=f'observed_{coef}',
                marker_color=colors[i],
                opacity=0.5,
            )
        )
        
        gamma = np.sqrt(1 - coef**2)  # Standard deviation for the normal distribution
        
        # Convert normal distribution to match scale
        theory_values = 1/(gamma * np.sqrt(2 * np.pi)) * np.exp(-(x_range**2)/(2 * gamma**2))
        
        fig.add_trace(
            go.Scatter(
                x=x_range,
                y=theory_values,
                mode='lines',
                name=f'cauchy_{coef}', 
                line=dict(color=colors[i], width=2)
            )
        )
    
    # Update layout
    fig.update_layout(
        title='Turning-angle Distribution - Source vs Observed',
        legend_title='Distributions',
        height=600,
        plot_bgcolor='rgb(225, 235, 245)'
    )
    
    return fig

# GENERATE TRAJECTORIES AND PLOTING

In [276]:
cauchy_coefficients = [0.6, 0.9]

# Generate trajectories
trajectories = []
for coef in cauchy_coefficients:
    trajectory = crw_trajectory(n_steps=1000, correlation_coeff=coef)
    trajectories.append(trajectory)

# Plot the distributions
fig = plot_turning_angle_distributions(trajectories, cauchy_coefficients)
fig.show()

# Activity 4: Step-length Distribution - (source dist. vs observed dist.)

- Write a function that returns a **Lévy Walk (LW)** trajectory in **pandas** df.
- Consider two **LW** trajectories with different **alpha coefficients.**
- Write a function that restaurants the **step lengths** for a given trajectory.
- Compare the observed distribution (**histogram**) to the source distribution (**curve**) for
both trajectories, as shown in the figure below.
- Display the results using **plotly.**

# FUNCTIONS

In [264]:
#####################################################################################
# Levy Walk
#####################################################################################
def levy_walk(steps=1000, alpha=1.0, beta=1.0, scale=1.0, x0=0, y0=0):
    """
    Arguments
    -----------
        steps: Number of steps in the random walk trajectory
        alpha: Stability parameter that determines the shape/heaviness of the tails  and 2)
        beta: Skewness parameter of the distribution (float between -1 and 1)
        scale: Scale parameter that controls the width of the distribution (float > 0)
        x0: Initial x-coordinate position (float)
        y0: Initial y-coordinate position (float)
    
    Returns
    -----------
        result_df: Pandas DataFrame containing the trajectory coordinates (x,y), step lengths and angles
                  Columns: 'x', 'y', 'step_length', 'angle'
    """

    # Step lengths from a Lévy stable distribution
    step_lengths = levy_stable.rvs(alpha=alpha, beta=beta, loc=0, scale=scale, size=steps)
    step_lengths = np.abs(step_lengths)  # Take absolute values for physical distances
    
    # Random angles uniformly between 0 and 2π
    angles = np.random.uniform(0, 2*np.pi, steps)
    
    # Initialize arrays for coordinates
    x_coords = np.zeros(steps + 1)
    y_coords = np.zeros(steps + 1)
    
    # Set initial position
    x_coords[0] = x0
    y_coords[0] = y0
    
    # Calculate trajectory
    for i in range(steps):
        x_coords[i+1] = x_coords[i] + step_lengths[i] * np.cos(angles[i])
        y_coords[i+1] = y_coords[i] + step_lengths[i] * np.sin(angles[i])
    
    # Create the result DataFrame
    result_df = pd.DataFrame({
        'x': x_coords,
        'y': y_coords,
        'step_length': np.append(step_lengths, np.nan),
        'angle': np.append(angles, np.nan)
    })
    
    return result_df

In [265]:
#####################################################################################
# Levy PDF
#####################################################################################
def levy_pdf(x, alpha, beta=1.0, scale=1.0):
    """    
    Arguments
    -----------
        x: Point at which to evaluate the PDF (float > 0)
        alpha: Stability parameter that determines the shape/heaviness of the tails (float between 0 and 2)
        beta: Skewness parameter of the distribution (float between -1 and 1)
        scale: Scale parameter that controls the width of the distribution (float > 0)
    
    Returns
    -----------
        float: The PDF value at point x
    """

    # Estimate using direct calculation from the levy_stable PDF
    return levy_stable.pdf(x, alpha=alpha, beta=beta, scale=scale) + \
           levy_stable.pdf(-x, alpha=alpha, beta=beta, scale=scale)

In [281]:
#####################################################################################
# Plot Step Length Distributions
#####################################################################################
def plot_step_length_distributions(alpha1=1.0, alpha2=0.7, beta=1.0, scale=1.0, steps=10000, max_x=50, bins=50):
    """
    Arguments
    ----------
        alpha1: Stability parameter for first Lévy walk (between 0 and 2)
        alpha2: Stability parameter for second Lévy walk (between 0 and 2)
        beta: Skewness parameter (between -1 and 1)
        scale: Scale parameter controlling width of distribution
        steps: Number of steps in each walk
        max_x: Maximum x-value for plotting
        bins: Number of histogram bins
        
    Returns
    -------
        plotly.graph_objects.Figure: figure comparing the step length distributions
    """

    # Generate trajectories
    levy_traj_1 = levy_walk(steps=steps, alpha=alpha1, beta=beta, scale=scale)
    levy_traj_07 = levy_walk(steps=steps, alpha=alpha2, beta=beta, scale=scale)
    
    # Extract step lengths
    step_lengths1 = levy_traj_1['step_length'].dropna().values
    step_lengths2 = levy_traj_07['step_length'].dropna().values
    
    # Create histogram bins
    bin_edges = np.linspace(0, max_x, bins+1)
    bin_centers = 0.5 * (bin_edges[1:] + bin_edges[:-1])
    
    # Calculate histograms
    hist1, _ = np.histogram(step_lengths1, bins=bin_edges, density=True)
    hist2, _ = np.histogram(step_lengths2, bins=bin_edges, density=True)
    
    # Generate x values for the theoretical PDF curves
    x_vals = np.linspace(0.1, max_x, 1000)
    
    # Calculate theoretical PDFs for folded Lévy stable distributions
    pdf1 = np.array([levy_pdf(x, alpha=alpha1, beta=beta, scale=scale) for x in x_vals])
    pdf2 = np.array([levy_pdf(x, alpha=alpha2, beta=beta, scale=scale) for x in x_vals])
    
    # Create figure
    fig = go.Figure()
    
    # Add observed distributions (histograms)
    fig.add_trace(go.Bar(
        x=bin_centers,
        y=hist1,
        width=0.3,
        name=f'Observed_alpha={alpha1}_beta={beta}',
        marker_color='rgba(128, 0, 255, 0.7)',
        offset=-0.15  
    ))
    
    fig.add_trace(go.Bar(
        x=bin_centers,
        y=hist2,
        width=0.3,
        name=f'Observed_alpha={alpha2}_beta={beta}',
        marker_color='rgba(255, 0, 0, 0.7)',
        offset=0.15
    ))
    
    # Add theoretical distributions (curves)
    fig.add_trace(go.Scatter(
        x=x_vals,
        y=pdf1,
        mode='lines',
        name=f'Levy_alpha={alpha1}',
        line=dict(color='blue', width=2)
    ))
    
    fig.add_trace(go.Scatter(
        x=x_vals,
        y=pdf2,
        mode='lines',
        name=f'Levy_alpha={alpha2}',
        line=dict(color='red', width=2)
    ))

    fig.update_layout(
    title='Turning-angle Distribution - Source vs Observed',
    height=600,
    plot_bgcolor='rgb(225, 235, 245)'
    )
    
    
    return fig

# PLOTING

In [282]:
fig = plot_step_length_distributions()
fig.show()