<a href="https://colab.research.google.com/github/sqbitegh/Colabs/blob/main/DataAnalyst.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install toolz==0.12.0
#!pip install matplotlib==3.7.1

In [1]:
from typing import List, Callable, Tuple
from toolz import pipe, map, filter, partial, reduce, concat, take, drop
import numpy as np
import matplotlib.pyplot as plt

import plotly.express as px


In [2]:


def read_vectors(filepath: str, max_rows: int = None) -> List[List[float]]:
  """Reads a file containing newline-separated vectors and returns a list of vectors."""
  with open(filepath, 'r') as file:
      if max_rows is None:
        return [[float(num) for num in line.split()] for line in file if line.strip() and 'end' not in line]
      else:
        return [[float(num) for num in line.split()] for line in file if line.strip() and 'end' not in line][:max_rows]
    #return [[float(num) for num in line.split()] for line in file if line.strip()]

def map_vectors(func: Callable[[List[float]], List[float]], vectors: List[List[float]]) -> List[List[float]]:
  """Applies a function to each vector in the list."""
  return list(map(func, vectors))

def filter_vectors(predicate: Callable[[List[float]], bool], vectors: List[List[float]]) -> List[List[float]]:
  """Filters the list of vectors based on a predicate."""
  return list(filter(predicate, vectors))

def sort_vectors_by_column(vectors: List[List[float]], column_index: int, reverse: bool = False) -> List[List[float]]:
  """Sorts the list of vectors by a specific column."""
  return sorted(vectors, key=lambda vector: vector[column_index], reverse=reverse)

def zip_vectors(vectors1: List[List[float]], vectors2: List[List[float]]) -> List[Tuple[List[float], List[float]]]:
  """Zips two lists of vectors together."""
  return list(zip(vectors1, vectors2))

def concat_vectors(vectors1: List[List[float]], vectors2: List[List[float]]) -> List[List[float]]:
  """Concatenates two lists of vectors."""
  return list(concat([vectors1, vectors2]))

def cut_vectors(vectors: List[List[float]], start_index: int, end_index: int) -> List[List[float]]:
  """Cuts a list of vectors by index ranges."""
  return list(take(end_index, drop(start_index, vectors)))

def read_bool_vector(filepath: str) -> List[float]:
  """Reads a file containing newline-separated boolean strings (True/False)
  and converts them to a list of floats (1.0/0.0)."""
  with open(filepath, 'r') as file:
    return [1.0 if line.strip() == 'True' else 0.0 for line in file if 'end' not in line]

def add_dimension(vectors: List[List[float]], new_dimension: List[float]) -> List[List[float]]:
    """Adds a new dimension as the first element to each vector."""
    return [[new_val] + vec for new_val, vec in zip(new_dimension, vectors)]

def squeeze_columns(vectors: List[List[float]], c1: int, c2: int) -> List[List[float]]:
    """Squeezes columns from c1 to c2 (inclusive) into a single column by addition.
    Handles c1=0 correctly.
    """
    return list(map(lambda vector: ([sum(vector[c1:c2])] + vector[c2:]) if c1 == 0
                                  else (vector[:c1-1] + [sum(vector[c1-1:c2])] + vector[c2:]),
                   vectors))

def print_vector_info(vectors: List[List[float]]) -> None:
  """Prints the size and dimensions of the list of vectors."""
  num_vectors = len(vectors)
  if num_vectors > 0:
    vector_dim = len(vectors[0])
  else:
    vector_dim = 0  # Handle empty list case

  print(f"Number of vectors: {num_vectors}")
  print(f"Dimension of vectors: {vector_dim}")

  # Using NumPy for a more concise output
  if num_vectors > 0:
    print(f"Shape of vectors (NumPy): {np.array(vectors).shape}")



def visualize_output(output):
    """Visualizes the output matrix using matplotlib."""
    # Define numerical values for colors
    color_map = {
        1: 1,  # Blue
        2: 2,  # Yellow
        3: 3   # Red
    }

    # Convert output to a NumPy array with numerical values
    numerical_output = np.array([[color_map[val] for val in row] for row in output])

    # Create a custom colormap
    from matplotlib.colors import ListedColormap
    cmap = ListedColormap(['blue', 'yellow', 'red']) # Colors for values 1, 2, 3

    num_rows = numerical_output.shape[0]
    num_cols = numerical_output.shape[1]

    # Determine figure size based on the number of rows and columns
    # You might need to adjust the scaling factors (e.g., 0.5 and 0.1)
    # to get the desired visualization size.
    fig_width = num_cols * 0.02
    fig_height = num_rows * 0.01

    plt.figure(figsize=(fig_width, fig_height))

    # Display the numerical output using imshow with the custom colormap
    plt.imshow(numerical_output, aspect='auto', interpolation='nearest', cmap=cmap)
    plt.colorbar() # Add a colorbar to show the mapping
    plt.show()



def visualize_output2(output, columns, s_row=0, n_rows=None):
  """
  Visualizes the output based on specified columns using Plotly.

  Args:
    output: The output data (list of lists).
    columns: A list of column indices to visualize (length 2 for 2D, 3 for 3D).
  """
  if len(columns) < 2 or len(columns) > 3:
    print("Please provide either 2 or 3 column indices for visualization.")
    return

  # Convert output to a NumPy array for easier column access
  output_array = np.array(output)
  if n_rows is not None:
      output_array = output_array[s_row:n_rows]
  else:
      output_array = output_array[s_row:]

  if len(columns) == 2:
    x_col = columns[0]
    y_col = columns[1]
    if x_col >= output_array.shape[1] or y_col >= output_array.shape[1]:
      print("Invalid column index provided.")
      return

    fig = px.scatter(x=output_array[:, x_col], y=output_array[:, y_col],  width=300, height=300)
    fig.update_layout(
        xaxis_title=f"Column {x_col}",
        yaxis_title=f"Column {y_col}",
        title="2D Scatter Plot"
    )
    fig.show()

  elif len(columns) == 3:
    x_col = columns[0]
    y_col = columns[1]
    z_col = columns[2]
    if x_col >= output_array.shape[1] or y_col >= output_array.shape[1] or z_col >= output_array.shape[1]:
      print("Invalid column index provided.")
      return

    fig = px.scatter_3d(x=output_array[:, x_col], y=output_array[:, y_col], z=output_array[:, z_col], width=300, height=300)
    fig.update_layout(
        scene = dict(
            xaxis_title=f"Column {x_col}",
            yaxis_title=f"Column {y_col}",
            zaxis_title=f"Column {z_col}"),
        title="3D Scatter Plot"
    )
    fig.show()


In [3]:
def initialize_datavector(activations_filename, correctness_vals):
  vectors = read_vectors(activations_filename, None)  # Assuming vectors are stored in 'vectors.txt'
  if correctness_vals is None:
    bool_vector = [1.0] * len(vectors)
  else:
    bool_vector = read_bool_vector(correctness_vals)  # Assuming boolean strings are in 'bool_vector.txt'
  print_vector_info(vectors)
  #print(f"bool_vector {bool_vector}")
  # Add the boolean vector as the first dimension to the existing vectors
  merged_vectors = add_dimension(vectors, bool_vector) #if not equal length truncate to shorter
  print(f"merged_vectors {merged_vectors[:10]}")
  print_vector_info(merged_vectors)
  return merged_vectors

def process_vectors_EuclChart(merged_vectors,cols, s_row, n_rows):
  visualize_output2(merged_vectors, cols, s_row, n_rows)


def process_vectors(activations_filename, correctness_vals, sort_range, max_rows):
  vectors = read_vectors(activations_filename, max_rows)  # Assuming vectors are stored in 'vectors.txt'
  if correctness_vals is None:
    bool_vector = [1.0] * len(vectors)
  else:
    bool_vector = read_bool_vector(correctness_vals)  # Assuming boolean strings are in 'bool_vector.txt'
  print_vector_info(vectors)
  print(f"bool_vector {bool_vector}")

  # Add the boolean vector as the first dimension to the existing vectors
  merged_vectors = add_dimension(vectors, bool_vector)
  print(f"merged_vectors {merged_vectors}")
  print_vector_info(merged_vectors)
  #merged_vectors = squeeze_columns(merged_vectors, 1, 6)
  #merged_vectors = squeeze_columns(merged_vectors, 8, 11)
  print(f"merged_vectors squeezed {merged_vectors}")
  print_vector_info(merged_vectors)
  print("filterby first column")
  merged_vectors = filter_vectors(lambda vector: vector[0] == 1.0, merged_vectors)
  print_vector_info(merged_vectors)

  process_vectors2(merged_vectors, sort_range)

def process_vectors2(merged_vectors, sort_range):
  if sort_range is None:
    sorted_vectors = merged_vectors
    result = process_vectors3(sorted_vectors)
    print(f"output {result}")
    print_vector_info(result)
    visualize_output(result)
  else:
    for sort_column in sort_range:
      sorted_vectors = sort_vectors_by_column(merged_vectors, column_index=sort_column)
      result = process_vectors3(sorted_vectors)
      print(f"output {result}")
      print_vector_info(result)
      visualize_output(result)
  #sorted_vectors = merged_vectors
  #sorted_vectors = filter_vectors(lambda vector: vector[0] == 1.0, sorted_vectors)

  #print (f"sorted_vectors {sorted_vectors}")
  #print_vector_info(sorted_vectors)

def process_vectors3(sorted_vectors):
    # 2. Compare adjacent elements in each column and generate output
  output = []
  for i in range(len(sorted_vectors)):
    current_vector_output = []  # Output for the current vector
    # Skip the first row for comparison
    if i > 0:
      for j in range(1, len(sorted_vectors[i])):  # Start from the second column (index 1)
        if sorted_vectors[i][j] > sorted_vectors[i - 1][j]:
          current_vector_output.append(3)
        elif sorted_vectors[i][j] == sorted_vectors[i - 1][j]:
          current_vector_output.append(2)
        else:
          current_vector_output.append(1)
      output.append(current_vector_output)  # Append output for current vector to overall output
  return output






In [None]:


process_vectors('activations_fc_input_d64_h1_epoch_4_test.txt', 'corrects_list_d64_h1_epoch_4_test.txt', range(1,32))
process_vectors('activations_fc_input_d64_h1_epoch_4_test.txt', 'corrects_list_d64_h1_epoch_4_test.txt', None)
#process_vectors('activations_fc_input_d64_h1_epoch_4_train.txt', 'corrects_list_d64_h1_epoch_4_train.txt', None)



In [None]:
#from tinystories
process_vectors('sample_data/activations_rec4ep20_f10000.txt',None , None, 1000)
process_vectors('sample_data/activations_rec4ep20_f10000.txt',None , range(5, 12), 600)


#from tinystories, 2-3D

In [23]:
datavector = initialize_datavector('sample_data/activations_rec6ep20_f10000.txt',None)
#datavector = initialize_datavector('sample_data/activations_rec4ep20_f10000.txt',None)


Number of vectors: 9923
Dimension of vectors: 384
Shape of vectors (NumPy): (9923, 384)
merged_vectors [[1.0, -2.014961, -0.189692, 0.698256, -1.534862, 0.389196, -0.120946, 0.945911, -0.786093, -0.31611, 0.047418, 0.271338, 2.273068, -0.53373, -1.057709, -1.003096, 0.692656, 1.218659, 0.486906, 0.489107, 2.838626, 1.139455, -0.882304, -0.019262, -0.583819, 1.260327, 0.384413, 0.401562, -1.017081, 0.231304, -0.002782, -0.858961, 1.05292, 1.14623, 2.059783, -0.443097, 1.139218, -0.642852, 0.391504, -3.259594, -1.483317, -0.118397, -1.448511, -1.147241, -2.793817, 2.001098, 0.465996, 0.214453, -0.732558, 0.439515, -1.412888, 2.432482, 0.512338, -0.019663, 0.631106, 0.896996, 0.387827, 0.519085, -1.314049, -0.095021, -1.48908, -0.605023, 0.737824, -0.818622, -0.274139, 0.136743, 0.081298, -0.923815, -0.496886, -1.761149, 0.107752, 0.598983, -1.015799, -0.041935, -1.037191, 1.381775, -0.036522, -0.219831, 0.978968, 0.569763, 0.972294, -1.50043, 0.164055, -0.331759, -0.208184, -8.576047, 0.

In [None]:
process_vectors_EuclChart(datavector, [17 ,18], 700,800)


In [None]:
sorted_vectors = sort_vectors_by_column(datavector, column_index=7)

for x_col in range(10, 11, 1):
    for y_col in range(x_col-4, x_col+10, 1):
      process_vectors_EuclChart(sorted_vectors, [x_col ,y_col], 10,1000)
"""
for x_col in range(1, 20, 1):
    for y_col in range(x_col+1, x_col+2, 1):
      process_vectors_EuclChart(sorted_vectors, [x_col ,y_col], 10,100)
"""

Tests

In [None]:
#test
process_vectors('example_vectors.txt', 'example_bool_vector.txt', None)
process_vectors('example_vectors.txt', 'example_bool_vector.txt', range(1,3))


In [None]:
# Example Usage
vectors = read_vectors('example_vectors.txt')  # Assuming vectors are stored in 'vectors.txt'

# Map: Double each element in each vector
doubled_vectors = map_vectors(lambda vector: [x * 2 for x in vector], vectors)

# Filter: Keep vectors where the first element is positive
positive_vectors = filter_vectors(lambda vector: vector[0] > 0, vectors)

# Sort: Sort vectors by the second column in descending order
sorted_vectors = sort_vectors_by_column(vectors, column_index=1, reverse=True)

# Zip: Combine two lists of vectors
zipped_vectors = zip_vectors(vectors, doubled_vectors)

# Concat: Concatenate two lists of vectors
concatenated_vectors = concat_vectors(vectors, doubled_vectors)

# Cut: Get vectors from index 2 to 5
cut_vectors_result = cut_vectors(vectors, start_index=2, end_index=5)



FileNotFoundError: [Errno 2] No such file or directory: 'example_vectors.txt'