In [1]:
"""
Data Structures Practice Exercises

"""

from collections import defaultdict, Counter, deque
import string

In [2]:

def process_numbers(nums):
        # YOUR CODE HERE
    result = []
    for n in nums:
        if n % 2 == 0:
            result.append(2*n)
        else:
            result.append(n)
    return result

process_numbers((1,2))

[1, 4]

In [3]:
def analyze_transactions(transactions):
        # YOUR CODE HERE
        user_totals = defaultdict(int)
        for user, amount in transactions:
            user_totals[user] += amount
        
        negative_users = [user for user, total in user_totals.items() if total < 0]
        
        return dict(user_totals), negative_users

            
analyze_transactions(transactions = [
        ('alice', 100), ('bob', -50), ('alice', -30), 
        ('charlie', 200), ('bob', -60), ('alice', 10)
    ])

({'alice': 80, 'bob': -110, 'charlie': 200}, ['bob'])

In [4]:
def analyze_string(text):
        # YOUR CODE HERE
        # Most frequent character (excluding spaces)
        char_count = Counter(char.lower() for char in text if char != ' ')
        most_frequent = char_count.most_common(1)[0][0] if char_count else ''
        
        # Longest word
        words = text.split()
        longest_word = max(words, key=len) if words else ''
        
        # Vowel count
        vowels = set('aeiouAEIOU')
        vowel_count = sum(1 for char in text if char in vowels)
        
        return {
            'most_frequent_char': most_frequent,
            'longest_word': longest_word,
            'vowel_count': vowel_count
        }
    
    # Test cases
test_strings = ["hello world programming", "The quick brown fox jumps over the lazy dog", "Python is awesome for data manipulation"]
    
for text in test_strings:
     result = analyze_string(text)
     print(f"'{text}' -> {result}")

'hello world programming' -> {'most_frequent_char': 'l', 'longest_word': 'programming', 'vowel_count': 6}
'The quick brown fox jumps over the lazy dog' -> {'most_frequent_char': 'o', 'longest_word': 'quick', 'vowel_count': 11}
'Python is awesome for data manipulation' -> {'most_frequent_char': 'a', 'longest_word': 'manipulation', 'vowel_count': 15}


In [5]:
#Pattern 1: Array/List Manipulation
#Question: "Find all pairs in an array that sum to a target value."

def two_sum(nums, target):
    seen = {}
    for i, num in enumerate(nums):
        complement = target - num
        if complement in seen:
            return [seen[complement], i]
        seen[num] = i
    return []

In [6]:
#Pattern 2: String Processing
#Question: "Check if two strings are anagrams
def is_anagram(s1, s2):
    return Counter(s1) == Counter(s2)
    # Alternative: return sorted(s1) == sorted(s2)

In [7]:
#Pattern 3: Hash Table Grouping
#Question: "Group words by their length."
def group_by_length(words):
    groups = defaultdict(list)
    for word in words:
        groups[len(word)].append(word)
    return dict(groups)

In [8]:
import pandas as pd

def analyze_sentiment_data(csv_path):
    # Read the CSV file
    data = pd.read_csv(csv_path)

    # Extract the 'name' and 'sentiment' columns
    name_sentiment_data = data[['name', 'sentiment']]

    # Group by 'name' and calculate the average sentiment and count of appearances
    grouped_data = name_sentiment_data.groupby('name').agg(
        average_sentiment=('sentiment', 'mean'),
        appearances=('sentiment', 'count')
    ).reset_index()

    # Find the name with the most appearances
    most_appearances = grouped_data.loc[grouped_data['appearances'].idxmax()]

    # Find the name with the lowest average sentiment
    lowest_avg_sentiment = grouped_data.loc[grouped_data['average_sentiment'].idxmin()]

    return {
        'average_sentiments': dict(zip(grouped_data['name'], grouped_data['average_sentiment'])),
        'most_appearances': most_appearances['name'],
        'lowest_avg_sentiment': (lowest_avg_sentiment['name'], lowest_avg_sentiment['average_sentiment'])
    }

# Example usage
# result = analyze_sentiment_data('path_to_your_csv_file.csv')
# print(result)


In [9]:
#Given a CSV of daily stock prices, calculate the 7-day moving average for each stock.
def calculate_moving_average(prices_dict, window=7):
    result = {}
    for stock, prices in prices_dict.items():
        if len(prices) < window:
            result[stock] = []
            continue
        moving_avg = []
        for i in range(window-1, len(prices)):
            avg = sum(prices[i-window+1:i+1]) / window
            moving_avg.append(avg)
        result[stock] = moving_avg
    return result

In [10]:
#Given a dictionary of portfolio weights, validate that they sum to 1.0 and no weight exceeds 50%.
def validate_portfolio(weights):
    total = sum(weights.values())
    max_weight = max(weights.values())
    
    errors = []
    if abs(total - 1.0) > 1e-6:
        errors.append(f"Weights sum to {total:.6f}, not 1.0")
    if max_weight > 0.5:
        errors.append(f"Maximum weight {max_weight:.3f} exceeds 50%")
    
    return len(errors) == 0, errors

In [11]:
#Given a list of news articles with sentiment scores, group by date and calculate daily average sentiment.
def aggregate_sentiment(articles):
    # articles = [{'date': '2024-01-01', 'sentiment': 0.8}, ...]
    daily_sentiment = defaultdict(list)
    
    for article in articles:
        daily_sentiment[article['date']].append(article['sentiment'])
    
    return {
        date: sum(scores) / len(scores) 
        for date, scores in daily_sentiment.items()
    }

In [12]:
import pandas as pd
import numpy as np

def backtest_moving_avg(prices_csv):
    df = pd.read_csv(prices_csv, parse_dates=['date'])
    df = df.sort_values('date').set_index('date')
    df['SMA5'] = df['close_price'].rolling(window=5).mean()
    df['SMA20'] = df['close_price'].rolling(window=20).mean()

    # Create signal: 1 if SMA5 > SMA20, 0 otherwise
    df['signal'] = np.where(df['SMA5'] > df['SMA20'], 1, 0)
    # Generate trading signal: buy (1) when signal changes from 0 to 1; sell (0) when changes from 1 to 0
    df['position'] = df['signal'].diff().fillna(0)
    # For simplicity: hold position of +1 when signal==1 else 0
    df['hold'] = df['signal'].shift(1).fillna(0)  # We hold at close of previous day
    # Compute daily returns
    df['returns'] = df['close_price'].pct_change().fillna(0)
    # Strategy returns: returns when we are holding
    df['strat_returns'] = df['hold'] * df['returns']
    # Cumulative return
    cumulative = (1 + df['strat_returns']).cumprod() - 1
    total_return = cumulative.iloc[-1]
    # Sharpe (annualized): mean daily / std daily * sqrt(252)
    sharpe = df['strat_returns'].mean() / df['strat_returns'].std(ddof=0) * np.sqrt(252)
    return total_return, sharpe, df




In [13]:
import numpy as np
import pandas as pd
import cvxpy as cp

def min_var_weights(returns_df):
    # --- Debugging Step 1: Check for NaNs in input DataFrame ---
    if returns_df.isnull().sum().sum() > 0:
        print("Warning: Input DataFrame 'returns_df' contains NaN values.")
        print(returns_df.isnull().sum())
        # Option 1: Drop NaNs (simplest, but loses data)
        returns_df = returns_df.dropna()
        # Option 2: Fill NaNs (e.g., with 0 or forward-fill)
        # returns_df = returns_df.fillna(0) # or returns_df.fillna(method='ffill')
        print("NaN values handled by dropping rows with NaNs.")
        if returns_df.empty:
            raise ValueError("DataFrame became empty after dropping NaNs. Cannot proceed with covariance calculation.")

    cov = returns_df.cov().values  # 3 × 3 covariance

    # --- Debugging Step 2: Inspect the covariance matrix ---
    print("\nCalculated Covariance Matrix:")
    print(pd.DataFrame(cov, columns=returns_df.columns, index=returns_df.columns))

    if np.isnan(cov).any():
        raise ValueError("Covariance matrix contains NaN values. This usually means there were not enough non-NaN data points for calculation.")

    # A quick check for positive semi-definiteness (more robust checks involve eigenvalues)
    # If the determinant is very close to zero, it might indicate singularity
    try:
        det_cov = np.linalg.det(cov)
        print(f"\nDeterminant of covariance matrix: {det_cov:.2e}")
        if det_cov < 1e-15: # Arbitrary small threshold
            print("Warning: Covariance matrix determinant is very small, might be near singular.")
    except np.linalg.LinAlgError:
        print("Warning: Could not compute determinant, possibly singular matrix.")


    n = cov.shape[0]

    # Define optimization variable
    w = cp.Variable(n)
    # Objective: minimize wᵀ Σ w
    risk = cp.quad_form(w, cov)
    objective = cp.Minimize(risk)
    # Constraints: sum(w)=1, w >= 0
    constraints = [cp.sum(w) == 1, w >= 0]

    prob = cp.Problem(objective, constraints)

    # --- Solution Step 3 & 4: Try a different solver and enable verbose output ---
    try:
        # Try ECOS, which is generally quite robust.
        # verbose=True will print solver output to help diagnose issues.
        prob.solve(solver=cp.ECOS, verbose=True)
        # If ECOS fails, uncomment the line below to try SCS
        # prob.solve(solver=cp.SCS, verbose=True)

        if prob.status == cp.OPTIMAL or prob.status == cp.OPTIMAL_INACCURATE:
            return w.value  # array of length 3
        else:
            print(f"\nSolver status: {prob.status}")
            print("Problem could not be solved optimally.")
            return None # Or raise an error
    except Exception as e:
        print(f"\nAn error occurred during solving with ECOS: {e}")
        print("Consider trying another solver like cp.SCS or checking your data for issues.")
        return None


# --- Main execution ---
# Read the CSV, parsing the 'date' column
returns_df = pd.read_csv(
    "/workspaces/backtesting/investment-portfolio-project/data/synthetic_portfolio.csv",
    parse_dates=['date']
)

# Set the 'date' column as the DataFrame's index
returns_df.set_index('date', inplace=True)



weights = min_var_weights(returns_df.copy()) # Use .copy() to avoid modifying the original DataFrame passed in (good practice)

if weights is not None:
    print("\nOptimal Weights:", weights)
else:
    print("\nCould not determine optimal weights.")

(CVXPY) Jun 12 12:21:18 PM: Your problem has 3 variables, 4 constraints, and 0 parameters.
(CVXPY) Jun 12 12:21:18 PM: It is compliant with the following grammars: DCP, DQCP
(CVXPY) Jun 12 12:21:18 PM: (If you need to solve this problem multiple times, but with different data, consider using parameters.)
(CVXPY) Jun 12 12:21:18 PM: CVXPY will first compile your problem; then, it will invoke a numerical solver to obtain a solution.
(CVXPY) Jun 12 12:21:18 PM: Your problem is compiled with the CPP canonicalization backend.
(CVXPY) Jun 12 12:21:18 PM: Compiling problem (target solver=ECOS).
(CVXPY) Jun 12 12:21:18 PM: Reduction chain: Dcp2Cone -> CvxAttr2Constr -> ConeMatrixStuffing -> ECOS
(CVXPY) Jun 12 12:21:18 PM: Applying reduction Dcp2Cone



Calculated Covariance Matrix:
          A         B         C
A  0.000100 -0.000017 -0.000027
B -0.000017  0.000180 -0.000012
C -0.000027 -0.000012  0.000095

Determinant of covariance matrix: 1.53e-12
                                     CVXPY                                     
                                     v1.6.6                                    
-------------------------------------------------------------------------------
                                  Compilation                                  
-------------------------------------------------------------------------------


(CVXPY) Jun 12 12:21:18 PM: Applying reduction CvxAttr2Constr
(CVXPY) Jun 12 12:21:18 PM: Applying reduction ConeMatrixStuffing
(CVXPY) Jun 12 12:21:19 PM: Applying reduction ECOS
(CVXPY) Jun 12 12:21:19 PM: Finished problem compilation (took 6.844e-01 seconds).
(CVXPY) Jun 12 12:21:19 PM: Invoking solver ECOS  to obtain a solution.
(CVXPY) Jun 12 12:21:19 PM: Problem status: optimal
(CVXPY) Jun 12 12:21:19 PM: Optimal value: 2.511e-05
(CVXPY) Jun 12 12:21:19 PM: Compilation took 6.844e-01 seconds
(CVXPY) Jun 12 12:21:19 PM: Solver (including time spent in interface) took 2.102e-03 seconds


-------------------------------------------------------------------------------
                                Numerical solver                               
-------------------------------------------------------------------------------

ECOS 2.0.10 - (C) embotech GmbH, Zurich Switzerland, 2012-15. Web: www.embotech.com/ECOS

It     pcost       dcost      gap   pres   dres    k/t    mu     step   sigma     IR    |   BT
 0  +0.000e+00  -2.027e-01  +5e+00  5e-01  6e-01  1e+00  1e+00    ---    ---    1  1  - |  -  - 
 1  +2.000e-04  -5.566e-02  +7e-01  2e-02  6e-02  7e-03  2e-01  0.9478  7e-02   1  1  1 |  0  0
 2  +2.019e-04  -3.505e-04  +9e-03  3e-04  1e-03  2e-04  2e-03  0.9875  2e-03   1  1  1 |  0  0
 3  +3.435e-05  +6.484e-06  +5e-04  1e-05  5e-05  6e-06  1e-04  0.9542  5e-04   1  1  1 |  0  0
 4  +2.520e-05  +2.283e-05  +4e-05  1e-06  5e-06  1e-06  1e-05  0.9644  7e-02   1  1  1 |  0  0
 5  +2.511e-05  +2.508e-05  +6e-07  2e-08  7e-08  2e-08  1e-07  0.9861  1e-04   1  1  1 |  0 

In [14]:
import numpy as np

def performance_metrics(daily_ret):
    # 1. Annualized return: (1 + r_daily).prod()^(252/N) - 1
    cumulative = np.prod(1 + daily_ret) - 1
    annual_ret = (1 + cumulative) ** (252 / len(daily_ret)) - 1

    # 2. Annualized vol: std(daily_ret) * sqrt(252)
    annual_vol = np.std(daily_ret, ddof=1) * np.sqrt(252)

    # 3. Sharpe (RFR=0): mean(daily_ret)/std(daily_ret) * sqrt(252)
    sharpe = np.mean(daily_ret) / np.std(daily_ret, ddof=1) * np.sqrt(252)

    return annual_ret, annual_vol, sharpe

# Example usage: metrics = performance_metrics(np.array([...]))
# print(f"Ann. Return: {metrics[0]:.2%}, Vol: {metrics[1]:.2%}, Sharpe: {metrics[2]:.2f}")


In [None]:
# given the following string

s = "abcd"

# write some code that will reverse it: 

def string_reversal(s: str) -> str:
    # your code here! 
    lst = list(s)
    rev=lst[::-1]
    return ''.join(rev)   

string_reversal(s)  # dcba



'dcba'

In [None]:
# write a function that converts lists that look like `input_list` into something that looks like `expected_list`. 
# Assume that any input to `format_list` will take the form of a list comprising strings that have a single underscore.

def format_list(li: list[str]) -> list[str]:
    # your code here
    n=len(li)
    for i in range(n):
        lr=li[i].split("_")
        expected_arr=f"_{i}_".join(lr)
        







input_list = ["item_thing", "next_item", "third_thingie", "howard_johnson"] 
expected_list = ["item_0_thing", "next_1_item", "third_2_thingie", "howard_3_johnson"] 
test_list = format_list(input_list)
assert test_list == expected_list
