In [2]:
import cudf
import cugraph

In [3]:
ratings = cudf.read_csv('data/ml-32m/ratings.csv')

ratings.head()

Unnamed: 0,userId,movieId,rating,timestamp
0,1,17,4.0,944249077
1,1,25,1.0,944250228
2,1,29,2.0,943230976
3,1,30,5.0,944249077
4,1,32,5.0,943228858


In [4]:
movie_descs = cudf.read_csv('data/movies_with_description.csv')

# Filter all ratings that not in movie_descs

ratings = ratings[ratings['movieId'].isin(movie_descs['movieId'])]

In [9]:
from collections import defaultdict
from tqdm.notebook import tqdm, trange

# Create an edgelist from the dataframe
edges = defaultdict(lambda: 0)

ratings_filtered = ratings.query("rating >= 4")

# Group by movieId and create edges
for movie_id, group in tqdm(ratings_filtered.groupby('movieId'), desc="Processing ratings", leave=False):
    users = group['userId'].values
    for i in trange(len(users), leave=False):
        for j in range(i + 1, len(users)):
            edges[(users[i].item(), users[j].item())] += 1
            edges[(users[j].item(), users[i].item())] += 1


Processing ratings:   0%|          | 0/33491 [00:00<?, ?it/s]

  0%|          | 0/45518 [00:00<?, ?it/s]

KeyboardInterrupt: 

In [None]:
from os import cpu_count
import dask.dataframe as dd
from dask import delayed
from collections import defaultdict
from tqdm.notebook import tqdm, trange
from dask.distributed import Client, progress
from os import environ

cpus = cpu_count() // 2

client = Client(f"{environ["HOSTNAME"]}:88991", n_workers=cpus, threads_per_worker=1, processes=True, memory_limit='4GB')

# Create an edgelist from the dataframe
edges = defaultdict(lambda: 0)

ratings_filtered = ratings.query("rating >= 4")

# Convert to Dask DataFrame
dask_ratings_filtered = dd.from_pandas(ratings_filtered.to_pandas(), npartitions=8)

@delayed
def process_group(movie_id, group):
    local_edges = defaultdict(lambda: 0)
    users = group['userId'].values
    for i in range(len(users)):
        for j in range(i + 1, len(users)):
            local_edges[(users[i].item(), users[j].item())] += 1
            local_edges[(users[j].item(), users[i].item())] += 1
    return local_edges

movie_groups = dask_ratings_filtered.groupby('movieId')

# Group by movieId and create edges in parallel
delayed_results = []
for movieId in tqdm(dask_ratings_filtered['movieId'].unique(), desc="Processing ratings", leave=False):
    group = movie_groups.get_group(movieId)
    delayed_results.append(process_group(movie_id, group))

# Compute the results using the Dask client
results = client.compute(delayed_results, sync=True)

# Combine the results
for local_edges in results:
    for key, value in local_edges.items():
        edges[key] += value


Perhaps you already have a cluster running?
Hosting the HTTP server on port 33701 instead


This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


Processing ratings:   0%|          | 0/33491 [00:00<?, ?it/s]

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/

KeyboardInterrupt: 

In [7]:
len(edges)

100111