In [None]:
!pip install pyspark
!pip install -U -q PyDrive
!apt install openjdk-8-jdk-headless -qq
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m19.3 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=22a3e79a35ceb0e789d82e08b32ab1092ac2e88c2b015fe04528c0761ba35a4d
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [104]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [1]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType
from pyspark.sql.functions import col, current_timestamp, to_date, hour, dayofweek\

import numpy as np

from itertools import combinations

In [3]:
def create_new_spark_context(appName):
    return SparkSession.builder.appName(appName)\
        .master("local[*]").getOrCreate()


spark_session = create_new_spark_context("LSH")
sc = spark_session.sparkContext


23/01/23 20:22:40 WARN Utils: Your hostname, Nasers-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 192.168.76.238 instead (on interface en0)
23/01/23 20:22:40 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/23 20:22:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
schema = StructType([
    StructField("DEVICE_CODE", IntegerType(), True),
    StructField("SYSTEM_ID", IntegerType(), True),
    StructField("ORIGINE_CAR_KEY", StringType(), True),
    StructField("FINAL_CAR_KEY", StringType(), True),
    StructField("CHECK_STATUS_KEY", IntegerType(), True),
    StructField("COMPANY_ID", StringType(), True),
    StructField("PASS_DAY_TIME", TimestampType(), True)
])


In [7]:
df = spark_session.read.csv(
    '/content/drive/MyDrive/MDA/HW3/TrafficData.csv', header=True, schema=schema)
df.show(1)


+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|DEVICE_CODE|SYSTEM_ID|ORIGINE_CAR_KEY|FINAL_CAR_KEY|CHECK_STATUS_KEY|COMPANY_ID|      PASS_DAY_TIME|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
|     200501|       81|       10477885|     10477885|               5|       161|2021-06-01 03:54:39|
+-----------+---------+---------------+-------------+----------------+----------+-------------------+
only showing top 1 row



In [8]:
traffic_rdd = df.rdd.map(lambda x: ((x["FINAL_CAR_KEY"], x["PASS_DAY_TIME"].date()), x["DEVICE_CODE"]))\
                    .groupByKey()\
                    .map(lambda x: (x[0], set(x[1])))


In [9]:
# make a numpy array with size of the number of distinct device codes
device_codes = traffic_rdd.flatMap(lambda x: tuple(x[1])).distinct().collect()
num_device = len(device_codes)
num_device

                                                                                

993

In [10]:
# hash function to map each device code to a number between 0 and num_device
device_index_map = {}
for i in range(num_device):
    device_index_map[device_codes[i]] = i

In [13]:
path_vec = np.zeros(num_device)
indices = np.random.choice(np.arange(num_device), replace=False,
                           size=int(num_device * 0.8))
path_vec[indices] = 1
path = []
for i in range(len(path_vec)):
    if path_vec[i] == 1:
        path.append(device_codes[i])


In [14]:
len(path)

794

In [15]:
def path_similarity(x):
    similarity = 0
    for device_code in x:
        similarity += path_vec[device_index_map[device_code]]
    return similarity / ((len(x) ** 0.5) * len(path) ** 0.5)


most_similar_path = traffic_rdd.map(lambda x: (x[0], path_similarity(x[1]))).sortBy(lambda x: x[-1], ascending=False)
most_similar_path.take(5)

                                                                                

[(('64111706', datetime.date(2021, 6, 1)), 0.8410357816782252),
 (('8073331', datetime.date(2021, 6, 1)), 0.24832062214018286),
 (('7633319', datetime.date(2021, 6, 1)), 0.24713098667733246),
 (('17610801', datetime.date(2021, 6, 1)), 0.2247049817646692),
 (('29485775', datetime.date(2021, 6, 1)), 0.20865462592433567)]

In [24]:
b = 10
r = 15
num_planes = b * r

random_planes = []
for i in range(num_planes):
    random_planes.append(np.random.choice([-1.0, 1.0], size=num_device))
random_planes = np.array(random_planes)
random_planes

array([[-1., -1.,  1., ..., -1., -1.,  1.],
       [-1., -1.,  1., ...,  1., -1., -1.],
       [ 1.,  1., -1., ...,  1.,  1., -1.],
       ...,
       [-1., -1., -1., ..., -1., -1.,  1.],
       [ 1.,  1., -1., ...,  1., -1., -1.],
       [ 1., -1.,  1., ..., -1., -1.,  1.]])

In [25]:
def calculate_hash(x):
    items = x
    lst = ""
    for plane in random_planes:
        i = 0
        for item in items:
            i += plane[device_index_map[item]]
        z = 1
        if i < 0:
            z = 0
        lst += str(z)
    return lst

hashed = traffic_rdd.map(lambda x : (x[0], calculate_hash(x[1])))

In [26]:
def hash_vector(x):
    hash_values = []
    for plane in random_planes:
        hash_value = 0
        for code in x:
            hash_value += plane[device_index_map[code]]
        hash_values.append(hash_value)
    sig = "".join(["1" if x > 0 else "0" for x in hash_values])
    return sig


In [27]:
hashed_path = hash_vector(path)


In [30]:
def match_hash(x):
    first = 0
    last = r-1
    for i in range(b):
        if hashed_path[first:last] == x[first:last]:
            return True
        first += r
        last += r
    return False

In [31]:
candidates = hashed.filter(lambda x: match_hash(x[1])).collect()
len(candidates)

                                                                                

645

In [32]:
unique_candidates = set(map(lambda x: tuple(x[0]), candidates))

In [36]:
# get the most similar path from the candidates
threshold = 0.8

most_similar_path = traffic_rdd.map(lambda x: (x[0], (tuple(x[1]), path_similarity(x[1]))))\
    .filter(lambda x: x[0] in unique_candidates)\
    .filter(lambda x: x[1][1] > threshold)\
    .collect()


                                                                                

In [38]:
len(most_similar_path)

1

In [39]:
ans = most_similar_path[0]
ans[0], ans[1][1]

(('64111706', datetime.date(2021, 6, 1)), 0.8410357816782252)