In [None]:
from syftbox.lib import config_for_user, syftbox_code

In [None]:
client_config = config_for_user("madhava@openmined.org")
client_config

In [None]:
client_config.use()

In [None]:
datasets = client_config.get_datasets()
datasets

In [None]:
datasets

In [None]:
netflix = datasets[0]

In [None]:
netflix

In [None]:
from syftbox.lib.david.rolle.at.gmail.com.datasets import netflix_tmdb_imdb

netflix_tmdb_imdb

In [None]:
from syftbox.lib.me.at.madhavajay.com.datasets import netflix_tmdb_imdb

netflix_tmdb_imdb

In [None]:
def reset_folder():
    import shutil

    try:
        shutil.rmtree("./crypto/data")
    except Exception:
        pass

In [None]:
# private
def create_and_get_he_context():
    import os

    from Pyfhel import Pyfhel

    crypto_folder = "./crypto"
    os.makedirs(crypto_folder, exist_ok=True)
    HE = Pyfhel()
    if os.path.exists("crypto/pyfhel.secret"):
        print("Loading HE keys")
        HE.load_context(f"{crypto_folder}/pyfhel.context")
        HE.load_secret_key(f"{crypto_folder}/pyfhel.secret")
        HE.load_public_key(f"{crypto_folder}/pyfhel.pk")
    else:
        print("Generating new HE keys")
        HE.contextGen(scheme="bfv", n=2**15, t_bits=20)
        HE.keyGen()
        HE.save_secret_key("crypto/pyfhel.secret")
        HE.save_public_key("crypto/pyfhel.pk")
        HE.save_context("crypto/pyfhel.context")

    return HE


HE = create_and_get_he_context()

In [None]:
# private
def create_he_data(HE):
    import os

    import numpy as np

    crypto_folder = "./crypto"
    stats_keys = [
        "total_time",
        "total_views",
        "total_unique_show_views",
        # "year_fav_day"
    ]

    stat_folder = f"./{crypto_folder}/data"
    part_path = f"{stat_folder}/totals"
    slice_folder = f"{stat_folder}/view_counts"
    os.makedirs(stat_folder, exist_ok=True)
    os.makedirs(slice_folder, exist_ok=True)

    # create totals
    stats_array = np.zeros(len(stats_keys)).astype(int)
    value = HE.encryptInt(stats_array)
    value.save(part_path)

    max_tv_id = 300_000  # just a guess
    slice_size = 30_000  # max size of the above HE context

    # create imdb_id slices
    counter = 0
    for i in range(0, max_tv_id + 1, slice_size):
        tv_count_array = np.zeros(slice_size).astype(int)
        tv_count_slice = HE.encryptInt(tv_count_array)
        part_path = f"{slice_folder}/tmdb_id_{counter:02}"
        tv_count_slice.save(part_path)
        counter += 1
    print("HE Data Created")


reset_folder()
create_he_data(HE)

In [None]:
# public federated code
@syftbox_code
def netflix_stats(datasite, df):
    import datetime
    import os

    import numpy as np
    import pandas as pd
    from Pyfhel import PyCtxt, Pyfhel

    crypto_folder = "./crypto"
    completed_sentinel = f"{crypto_folder}/{datasite}"
    if os.path.exists(completed_sentinel):
        print("✅ Already generated 🔐 Homomorphically Encrypted Stats")
        return

    HE = Pyfhel()
    HE.load_context(f"{crypto_folder}/pyfhel.context")
    HE.load_secret_key(f"{crypto_folder}/pyfhel.secret")
    HE.load_public_key(f"{crypto_folder}/pyfhel.pk")

    current_year = datetime.datetime.now().year
    df["netflix_date"] = pd.to_datetime(df["netflix_date"])
    year_df = df[df["netflix_date"].dt.year == current_year]
    year_tv_df = year_df[year_df["tmdb_media_type"] == "tv"]
    year_tv_df["day_of_week"] = year_tv_df["netflix_date"].dt.day_name()
    total_time = year_tv_df["imdb_runtime_minutes"].sum()
    total_views = len(year_tv_df)
    total_unique_show_views = year_tv_df["imdb_id"].nunique()
    # day_counts = year_tv_df["day_of_week"].value_counts()
    # favorite_day = list(day_counts.to_dict().keys())[0]
    # year_tv_df["day_of_week"] = year_tv_df["netflix_date"].dt.weekday
    # change to an int as a numpy array so we can add them

    value_counts = year_tv_df["tmdb_id"].value_counts().astype(int)

    stats = {
        "total_time": int(total_time),
        "total_views": int(total_views),
        "total_unique_show_views": int(total_unique_show_views),
        # "year_fav_day": str(favorite_day),
    }

    stat_folder = f"./{crypto_folder}/data"
    part_path = f"{stat_folder}/totals"
    slice_folder = f"{stat_folder}/view_counts"
    exists_files_folders = [stat_folder, part_path, slice_folder]

    for path in exists_files_folders:
        if not os.path.abspath(path):
            raise Exception(f"Requires {stat_folder} to finish syncing")

    imdb_id_files = os.listdir(slice_folder)
    if len(imdb_id_files) < 10:
        raise Exception(f"Requires {slice_folder} to finish syncing")

    # write stats to encrypted array
    stats_array = np.zeros(len(stats)).astype(int)
    for i, value in enumerate(stats.values()):
        stats_array[i] = int(value)

    value = PyCtxt(pyfhel=HE)
    value.load(part_path)
    value += stats_array
    value.save(part_path)

    slice_size = 30_000  # max size of the above HE context

    # write imdb_id value counts to chunked arrays
    for k, v in value_counts.items():
        imdb_id = int(k)
        index = imdb_id // slice_size
        sub_index = imdb_id % slice_size
        tv_count_slice = PyCtxt(pyfhel=HE)
        part_path = f"{slice_folder}/tmdb_id_{index:02}"
        empty_array = np.zeros(slice_size).astype(int)
        empty_array[sub_index] += int(v)
        tv_count_slice.load(part_path)
        tv_count_slice += empty_array
        tv_count_slice.save(part_path)

    with open(f"{crypto_folder}/{datasite}", "w") as f:
        print("✅ Writing 🔐 Homomorphically Encrypted Stats")
        f.write(str(datetime.datetime.now()))


# netflix_stats("me@madhavajay.com", netflix_tmdb_imdb.load())

In [None]:
def decode_results(HE, stat_keys, path):
    import numpy as np
    from Pyfhel import PyCtxt

    crypto_folder = path + "/crypto"
    stat_folder = f"./{crypto_folder}/data"
    part_path = f"{stat_folder}/totals"
    slice_folder = f"{stat_folder}/view_counts"

    # decode stats
    value = PyCtxt(pyfhel=HE)
    part_path = f"{stat_folder}/totals"
    value.load(part_path)
    value_array = HE.decryptInt(value)
    stats = {}
    for i, key in enumerate(stats_keys):
        stats[key] = int(value_array[i])

    tmdb_id_value_counts = {}
    max_tv_id = 300_000  # just a guess
    slice_size = 30_000  # max size of the above HE context
    counter = 0
    for i in range(0, max_tv_id + 1, slice_size):
        part_path = f"{slice_folder}/tmdb_id_{counter:02}"
        tv_count_slice = PyCtxt(pyfhel=HE)
        tv_count_slice.load(part_path)
        tv_count_array = HE.decryptInt(tv_count_slice)

        non_zero_indices = np.nonzero(tv_count_array)[0].astype(int)
        non_zero_values = tv_count_array[non_zero_indices].astype(int)
        outer_part = counter * slice_size
        non_zero_dict = {int(k + outer_part): int(v) for k, v in dict(zip(non_zero_indices, non_zero_values)).items()}
        tmdb_id_value_counts.update(non_zero_dict)
        counter += 1
        stats["value_counts"] = dict(sorted(tmdb_id_value_counts.items(), key=lambda item: item[1], reverse=True))
    return stats


stats_keys = [
    "total_time",
    "total_views",
    "total_unique_show_views",
    # "year_fav_day"
]
all_results = decode_results(HE, stats_keys, "./")
all_results

In [None]:
def top_k_summary(all_results, num_parties, top_k=5):
    top_5_summary = {}
    top_5_summary["avg_time"] = round(all_results["total_time"] / num_parties)
    top_5_summary["avg_views"] = round(all_results["total_views"] / num_parties)
    top_5_summary["avg_unique_show_views"] = round(all_results["total_unique_show_views"] / num_parties)
    top_5_summary["top_5"] = dict(
        sorted(all_results["value_counts"].items(), key=lambda item: item[1], reverse=True)[:top_k]
    )
    return top_5_summary


top_k_summary(all_results, 1)

In [None]:
dataset_list = [dataset for dataset in datasets]
type(dataset_list)

In [None]:
dataset_list[1].syft_link.datasite

In [None]:
path = netflix_stats.to_flow(client_config=client_config, inputs={"dfs": dataset_list})
path