In [1]:
import dask
dask.config.set({'dataframe.query-planning': True})
import dask.dataframe as dd

In [5]:
def load_imdb(filepath1, filepath2, netflix_path):
    data_rating = dd.read_csv(filepath1, sep="\t")
    data_basic = dd.read_csv(
        filepath2,
        sep="\t",
        dtype={"runtimeMinutes": "object", "startYear": "object", "isAdult": "object"},
    )
    netflix_data = dd.read_csv(netflix_path)
    return data_rating, data_basic, netflix_data



def merge_and_save(data_basic, data_rating, netflix_data, output_path):
    """Performs merging and saving, optimized with Dask."""
    final_data = data_basic[["tconst", "titleType", "primaryTitle", "isAdult", "startYear", "runtimeMinutes", "genres"]]
    merged_data = dd.merge(final_data, data_rating, on="tconst")

    # Lowercase transformation for title matching
    merged_data["Title_lower"] = merged_data["primaryTitle"].str.lower()
    netflix_data["Title_lower"] = netflix_data["Title"].str.lower()

    # Group by and compute idxmax in a more efficient way
    idxmax_computed = merged_data.groupby("Title_lower")["numVotes"].idxmax()
    
    # Merge instead of using .loc
    idxmax_computed = idxmax_computed.compute().reset_index()
    imdb_max_votes = dd.merge(merged_data, idxmax_computed, left_index=True, right_on='Title_lower')


    # Merge with Netflix data
    final_merge = dd.merge(imdb_max_votes, netflix_data, on="Title_lower", how="inner")

    # Writing to multiple files in parallel
    final_merge.to_csv(f"{output_path}final_merged_data_dask_*.csv", index=False)






In [3]:
import time

# ------------------------------
# Main Execution (Sample Paths)
# ------------------------------
IMDB_RATINGS_PATH = "/Users/pranavsukumaran/Desktop/netflix/project/data/data-2.tsv"
IMDB_BASICS_PATH = "/Users/pranavsukumaran/Desktop/netflix/project/data/data-3.tsv"
NETFLIX_PATH = "/Users/pranavsukumaran/Desktop/netflix/project/data/processed_data.csv"
OUTPUT_PATH = "/Users/pranavsukumaran/Desktop/netflix/project/data/"
start_time = time.time()
# Load the datasets

data_rating, data_basic, netflix_data = load_imdb(
    IMDB_RATINGS_PATH, IMDB_BASICS_PATH, NETFLIX_PATH
)
end_time = time.time()

total_time = end_time - start_time

print(f"Processing time using Dask: {total_time:.2f} seconds")



Processing time using Dask: 0.04 seconds


In [6]:
start_time = time.time()

merge_and_save(data_basic, data_rating, netflix_data, OUTPUT_PATH)

end_time = time.time()

total_time = end_time - start_time

print(f"Processing time using Dask: {total_time:.2f} seconds")

KeyError: 'Title_lower'

In [None]:
netflix_data

In [None]:
data_basic

In [None]:
data_rating

In [None]:
final_data = data_basic[
        [
            "tconst",
            "titleType",
            "primaryTitle",
            "startYear",
            "runtimeMinutes",
            "genres",
        ]
    ]

In [None]:
final_data

In [None]:
merged_data = dd.merge(final_data, data_rating, on="tconst")

In [None]:
merged_data

In [None]:
merged_data["Title_lower"] = merged_data["primaryTitle"].str.lower()
netflix_data["Title_lower"] = netflix_data["Title"].str.lower()

In [None]:
idxmax_computed = merged_data.groupby("Title_lower")["numVotes"].idxmax().compute()