# Local processing

In [1]:
import dask.dataframe as dd

# Read the edges TSV file into a Dask DataFrame
edges_df = dd.read_csv('data/edges.tsv', sep='\t', names=['source', 'target'])

# Create graph incidence list by grouping by source column
# This will give us for each source node all its target nodes
graph_incidence = edges_df.groupby('source')['target'].apply(
    lambda x: list(x),
    meta=('target', 'object')  # specify meta for proper schema inference
).compute()  # compute to bring result into memory

print("First few entries of the graph incidence list:")
print(graph_incidence.head())

First few entries of the graph incidence list:
source
1              [762]
2    [578, 282, 845]
5         [233, 405]
8               [81]
9          [650, 17]
Name: target, dtype: object


# Distributed processing

## Setting up Dask on Google Cloud Platform

To run Dask on GCP, we need:
1. Google Cloud SDK installed and configured
2. A GCP project with the required APIs enabled:
   - Compute Engine API
   - Cloud Resource Manager API
3. Authentication set up (`gcloud auth application-default login`)
4. Dependencies installed:
   ```bash
   pip install dask-cloudprovider google-cloud-storage
   ```

The following code will set up a Dask cluster on GCP.

In [9]:
from dask_cloudprovider.gcp import GCPCluster
from dask.distributed import Client
import os

# Configure your GCP project and zone from environment variables
project = os.environ.get('GCP_PROJECT_ID')
zone = os.environ.get('GCP_ZONE', 'us-central1-a')      # Default to us-central1-a if not set

if not project:
    raise ValueError("GCP_PROJECT_ID environment variable not set. Please set it in your .env file.")

# Create a GCP cluster
cluster = GCPCluster(
    projectid=project,
    zone=zone,
    n_workers=0,            # Number of worker nodes
    machine_type='n1-standard-1',  # Machine type for workers
    # filesystem_size=50,           # Disk size in GB
    preemptible=True,
    worker_class='dask_cloudprovider.gcp.GCPWorker',
    debug=True
)

# Create a Dask client
client = Client(cluster)

# Print cluster dashboard link
print(f"Dask dashboard available at: {client.dashboard_link}")

Launching cluster with the following configuration: 
  Source Image: projects/ubuntu-os-cloud/global/images/ubuntu-minimal-1804-bionic-v20201014 
  Docker Image: daskdev/dask:latest 
  Machine Type: n1-standard-1 
  Filesystem Size: 50 
  Disk Type: pd-standard 
  N-GPU Type:  
  Zone: us-central1-a 
Creating scheduler instance

Cloud init


#cloud-config


# Bootstrap
packages:
  - apt-transport-https
  - ca-certificates
  - curl
  - gnupg-agent
  - software-properties-common
  - ubuntu-drivers-common

# Enable ipv4 forwarding, required on CIS hardened machines
write_files:
  - path: /etc/sysctl.d/enabled_ipv4_forwarding.conf
    content: |
      net.ipv4.conf.all.forwarding=1

# create the docker group
groups:
  - docker

# Add default auto created user to docker group
system_info:
  default_user:
    groups: [docker]


runcmd:
  
  # Install Docker
  - curl -fsSL https://download.docker.com/linux/ubuntu/gpg | apt-key add -
  - add-apt-repository "deb [arch=amd64] https://download.do

  next(self.gen)


Dask dashboard available at: http://35.226.133.114:8787/status



+---------+-----------------+-----------------+---------+
| Package | Client          | Scheduler       | Workers |
+---------+-----------------+-----------------+---------+
| lz4     | None            | 4.3.3           | None    |
| python  | 3.11.12.final.0 | 3.10.12.final.0 | None    |
| toolz   | 1.0.0           | 0.12.0          | None    |
| tornado | 6.5.2           | 6.5.1           | None    |
+---------+-----------------+-----------------+---------+


In [10]:
cluster.get_logs()

In [11]:
import dask.dataframe as dd

# Read the edges TSV file into a Dask DataFrame
edges_df = dd.read_csv('data/edges.tsv', sep='\t', names=['source', 'target'])

# Create graph incidence list by grouping by source column
# This will give us for each source node all its target nodes
graph_incidence = edges_df.groupby('source')['target'].apply(
    lambda x: list(x),
    meta=('target', 'object')  # specify meta for proper schema inference
).compute()  # compute to bring result into memory

print("First few entries of the graph incidence list:")
print(graph_incidence.head())

RuntimeError: Error during deserialization of the task graph. This frequently
occurs if the Scheduler and Client have different environments.
For more information, see
https://docs.dask.org/en/stable/deployment-considerations.html#consistent-software-environments


In [12]:
cluster.close()

Closing Instance: dask-f3e4cf7e-scheduler
