In [1]:
!pip install pyspark
!apt install openjdk-8-jdk-headless -qq

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/45/b0/9d6860891ab14a39d4bddf80ba26ce51c2f9dc4805e5c6978ac0472c120a/pyspark-3.1.1.tar.gz (212.3MB)
[K     |████████████████████████████████| 212.3MB 65kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 18.0MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.1.1-py2.py3-none-any.whl size=212767604 sha256=101ed974451c3f07a1c419328dd97bb438fee4db58aeb6f8eeae5f31c71f1f11
  Stored in directory: /root/.cache/pip/wheels/0b/90/c0/01de724414ef122bd05f056541fb6a0ecf47c7ca655f8b3c0f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.1.1
The 

In [2]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
%matplotlib inline

from sklearn.model_selection import train_test_split
from sklearn.decomposition import SparsePCA

import scipy.sparse as sparse

import pyspark
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf
from pyspark.mllib.linalg.distributed import RowMatrix
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.feature import StandardScaler

from datetime import datetime
import os

In [3]:
# create the session
conf = SparkConf().set("spark.ui.port", "4050")

# create the context
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession.builder.getOrCreate()

In [4]:
from google.colab import drive
drive.mount("/content/drive",force_remount=True)

Mounted at /content/drive


In [27]:
EUCLIDEAN_DISTANCE = 1
MANHATTAN_DISTANCE = 2

In [28]:
dateparse = lambda x: datetime.utcfromtimestamp(int(x)).strftime('%Y-%m-%d %H:%M:%S')

files_path = '/content/drive/MyDrive/CSE547_Final_Project/ml-100k'
ratings_file = os.path.join(files_path, "ratings.csv")
movies_file = os.path.join(files_path, "movies.csv")
user_movie_ratings_matrix = os.path.join(files_path, "user_movie_ratings_matrix.csv")

ratings_df = pd.read_csv(   ratings_file, 
                            parse_dates=['timestamp'], 
                            date_parser=dateparse)
movies_df = pd.read_csv(movies_file)

In [29]:
user_movie_ratings_df = spark.read.csv(user_movie_ratings_matrix, header=True, inferSchema=True)

In [30]:
feature_columns = user_movie_ratings_df.columns
feature_columns.remove('userId')
vector_assembler = VectorAssembler(inputCols = feature_columns, outputCol = 'features_sparse')
user_movie_ratings_features_df = vector_assembler.transform(user_movie_ratings_df).select(['userId', 'features_sparse'])

In [31]:
standardizer = StandardScaler(withMean=True, withStd=True, inputCol='features_sparse', outputCol='features')
standardizer_model = standardizer.fit(user_movie_ratings_features_df)
user_movie_ratings_standardized_features_df = standardizer_model.transform(user_movie_ratings_features_df)
scaled_features_rows_rdd = user_movie_ratings_standardized_features_df.select("features").rdd
features_rows_rdd = user_movie_ratings_standardized_features_df.select("features_sparse").rdd

In [32]:
scaled_features_rows_rdd = scaled_features_rows_rdd.zipWithIndex()
scaled_features_rows_rdd = scaled_features_rows_rdd.map(lambda x: (x[1], np.array(x[0])) )

In [33]:
features_rows_rdd = features_rows_rdd.zipWithIndex()
features_rows_rdd = features_rows_rdd.map(lambda x: (x[1], np.array(x[0])) )

In [34]:
def euclidean_distance(point1, point2):
    return np.linalg.norm(point1 - point2) ** 2

In [35]:
def manhattan_distance(point1, point2):
    return abs(point1 - point2).sum()

In [36]:
def get_distance(point, centroid, distance_measure = EUCLIDEAN_DISTANCE):

    point_info = ()

    if distance_measure == EUCLIDEAN_DISTANCE:
        point_info = (point, euclidean_distance(point, centroid)) # value
    elif distance_measure == MANHATTAN_DISTANCE:
        point_info = (point, manhattan_distance(point, centroid)) # value
    else:
        raise NotImplementedError(f'{distance_measure} is not implemented.')

    return point_info

In [37]:
def get_centroid(data_rdd, distance_measure = EUCLIDEAN_DISTANCE):
    num_points = data_rdd.count()
    
    center_point = data_rdd.values().sum() / num_points

    # distances = (index, (point, distance))
    distances = data_rdd.map(lambda p: (p[0], get_distance(p[1], center_point, distance_measure)) )
    centroid = distances.takeOrdered(1, lambda x: x[1][1])[0][1][0]
    # centroid = distances.sortBy(lambda x: x[1][1]).collect()[0][1][0]

    return np.array(centroid)

In [38]:
def get_farthest_from_centroid(data_rdd, centroid, distance_measure = EUCLIDEAN_DISTANCE):
    # distances = (index, (point, distance))
    distances = data_rdd.map(lambda p: (p[0], get_distance(p[1], centroid, distance_measure)) )
    farthest_point = distances.takeOrdered(1, lambda x: -x[1][1])[0][1][0]
    # farthest_point = distances.sortBy(lambda x: x[1][1]).collect()[-1][1][0]

    return np.array(farthest_point)

In [39]:
def microaggregate(points_list, method = 'avg'):
    k = len(points_list)
    
    if method == 'avg':
        microaggregated = np.sum(points_list, axis=0) / k
    else:
        raise NotImplementedError(f'{method} is not implemented.')
    
    return microaggregated

In [40]:
def group_k_points(data_rdd, center, k, method = 'avg', distance_measure = EUCLIDEAN_DISTANCE):

    distances = data_rdd.map(lambda p: (p[0], get_distance(p[1], center, distance_measure)) )
    k_closest = distances.takeOrdered(k, lambda x: x[1][1])
    k_closest = [(x[0], x[1][0]) for x in k_closest]

    microaggregated = microaggregate([(x[1]) for x in k_closest], method)

    return microaggregated, k_closest

In [41]:
def MDAV(data_rdd, k = 5, distance_measure = EUCLIDEAN_DISTANCE, method = 'avg'):
    num_total_points = data_rdd.count()
    num_points = num_total_points

    num_features = data_rdd.top(1)[0][1].shape
    anonymized = np.zeros(num_features)
    while num_points >= 2*k:
        C = get_centroid(data_rdd, distance_measure)
        P = get_farthest_from_centroid(data_rdd, C, distance_measure)
        Q = get_farthest_from_centroid(data_rdd, P, distance_measure)
        
        for k_anon_center in [P, Q]:
            microaggregated, grouped = group_k_points(data_rdd, k_anon_center, k, method, distance_measure)
            grouped_rdd = sc.parallelize(grouped)
            # print(f'\tGrouped {grouped_rdd.keys().collect()}')

            # Add the anonymized group to an array
            anonymized = np.vstack([anonymized, microaggregated])

            # Remove the grouped points from the main datapoints
            data_rdd = data_rdd.subtractByKey(grouped_rdd)
        
        num_points = data_rdd.count()
        print(f'Number of points remaining: {num_points} / {num_total_points}')
        # print()
    
    if k < num_points < (2*k - 1):
        remain = data_rdd.map(lambda p: p[1]).collect()
        microaggregated = microaggregate(remain, method)
        
        # Add the anonymized group to an array
        anonymized = np.vstack([anonymized, microaggregated])
    else:
        # We should force these into points into the previous microcell
        # However, these may corrupt the data. Therefore, we ignore these points
        pass
    
    anonymized = np.delete(anonymized, 0, axis=0)
    return anonymized

In [42]:
distance_measure = EUCLIDEAN_DISTANCE
method = 'avg'
scaled = False

ratings_data_rdd = features_rows_rdd
if scaled:
    ratings_data_rdd = scaled_features_rows_rdd

k_values = [2, 3, 5, 8, 12, 15, 20, 25]
for k in k_values:
    anonymized = MDAV(ratings_data_rdd, k, distance_measure, method)

    anon_file_name = f'{k}_anonymized'
    if scaled: anon_file_name += '_scaled'
    anon_file_name += '.csv'

    anon_file_path = os.path.join(files_path, anon_file_name)
    np.savetxt(anon_file_path, anonymized, delimiter=",")

    print(f'Done with k = {k}')

Number of points remaining: 606 / 610
Number of points remaining: 602 / 610
Number of points remaining: 598 / 610
Number of points remaining: 594 / 610
Number of points remaining: 590 / 610
Number of points remaining: 586 / 610
Number of points remaining: 582 / 610
Number of points remaining: 578 / 610
Number of points remaining: 574 / 610
Number of points remaining: 570 / 610
Number of points remaining: 566 / 610
Number of points remaining: 562 / 610
Number of points remaining: 558 / 610
Number of points remaining: 554 / 610
Number of points remaining: 550 / 610
Number of points remaining: 546 / 610
Number of points remaining: 542 / 610
Number of points remaining: 538 / 610
Number of points remaining: 534 / 610
Number of points remaining: 530 / 610
Number of points remaining: 526 / 610
Number of points remaining: 522 / 610
Number of points remaining: 518 / 610
Number of points remaining: 514 / 610
Number of points remaining: 510 / 610
Number of points remaining: 506 / 610
Number of po