# Smallpond

In this Notebook we will be focusing in testing the current environment and getting to know the tools that are involved

## Imports

In [1]:
import ray
from minio import Minio
import os
import duckdb
import pandas as pd
import pyarrow.parquet as pq
from smallpond import init as smallpond_init

## Testing pandas and duckdb
In the following code you will create a simple pandas dataframe and then query it with duckdb and SQL

In [2]:
df = pd.DataFrame({"value": [10, 20, 30, 40, 50]})
result = duckdb.query("SELECT AVG(value) AS avg_value FROM df").to_df()
print(result)

   avg_value
0       30.0


## Testing Ray
You need to initialize Ray before running Smallpond. By sending the parameter address we are making sure to point to the correct Ray cluster.

In [3]:
print("initializing ray>>>>")
ray.init(address="ray://localhost:10001")  # Connects to ray-head which is running inside the cluster
print("Connected to Ray cluster:", ray.cluster_resources())


initializing ray>>>>
Connected to Ray cluster: {'object_store_memory': 7142218137.0, 'node:172.18.0.5': 1.0, 'CPU': 36.0, 'node:172.18.0.4': 1.0, 'node:172.18.0.3': 1.0, 'node:__internal_head__': 1.0, 'memory': 15904223233.0}


In [4]:
@ray.remote
def f(x):
    return x * x

print("Running Ray tasks...")
results = ray.get([f.remote(i) for i in range(4)])
print("Results:", results)

print("Shutting down Ray")
ray.shutdown()

Running Ray tasks...
Results: [0, 1, 4, 9]
Shutting down Ray


If you don't initialize ray then Smallpond will initialize a single node local ray cluster but in that case we are missing the distribution power of Smallpond.

## Testing MinIO

In [6]:
# Connect to MinIO
minio_client = Minio(
    endpoint=os.environ.get("MINIO_ENDPOINT", "http://localhost:9000").replace("http://", ""),
    access_key=os.environ.get("MINIO_ACCESS_KEY", "minioadmin"),
    secret_key=os.environ.get("MINIO_SECRET_KEY", "minioadmin"),
    secure=False,
)

# Create a bucket if it doesn’t exist
bucket_name = "my-data"
if not minio_client.bucket_exists(bucket_name):
    minio_client.make_bucket(bucket_name)
    print(f"Created bucket: {bucket_name}")
else:
    print(f"Bucket already exists: {bucket_name}")

object_name = "data/sample.parquet"
local_file_path = "sample.parquet"  # "/Full/ path /to /your /file/data/sample.parquet"
# Upload the file
minio_client.fput_object(
            bucket_name,
            object_name,
            local_file_path,
)
print(f"File '{local_file_path}' uploaded as '{object_name}' to bucket '{bucket_name}'.")

Bucket already exists: my-data
File 'sample.parquet' uploaded as 'data/sample.parquet' to bucket 'my-data'.


### Let's go to the MinIO dashboard

## Testing Smallpond
In your virtual environment make sure to have the correct MinIO credentials running the following line:

In [5]:
os.environ["AWS_ACCESS_KEY_ID"] = "minioadmin"
os.environ["AWS_SECRET_ACCESS_KEY"] = "minioadmin"
os.environ["AWS_REGION"] = "us-east-1"
os.environ["AWS_ENDPOINT_URL"] = "http://localhost:9000"

This way we can pull data directly like we do from an S3 bucket with the notation: s3://

In [None]:
# Get's the data from the parquet file (in your bucket) using PyArrow
a_table = pq.read_table("s3://my-data/data/sample.parquet")

Initialize Ray and Smallpond

In [None]:
# Initialize Ray
ray.init(address="ray://localhost:10001")

# Smallpond is pointing the ray head at ray://localhost:10001
sp = smallpond_init(job_name="ProcessingTest", ray_address="ray://localhost:10001", data_root="data", num_executors=2, bind_numa_node=False, executor_resources={"CPU": 4,"memory": 8,"object_store_memory": 2*1024})

df = sp.from_arrow(a_table)
print(df.take(10))

# Shuts down Ray
ray.shutdown()

### Now let's check the Smallpond Dashboard