BIG DATA ANALYTICS - Final Project
NAME : MABLINE ANDREA
STUDENT ID : 00231505
GROUP NAME : FP J


The below  tasks are performed using trial_COVID-19_Hospital_Impact.csv

Task 1.A. Extract binary features (i.e. a sparse representation of the characteristic
matrix) per hospital
For each record, treat the first element as the unique record id. For each of the 108 other
columns, treat them as a binary feature with an id as string of the form:
"column_name:value" and add that to a set that represents the values that are "1" in the
characteristic matrix (every element of the set not present is assumed to be 0).

In [1]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("BinaryFeatureExtraction").getOrCreate()

hospitals_rdd = spark.sparkContext.textFile("/content/trial_COVID-19_Hospital_Impact.csv", 32)

header = hospitals_rdd.first()
header_list = header.split(",")
header_broadcast = spark.sparkContext.broadcast(header_list)

def binary_features_generation(row):
    values = row.split(",")
    if len(values) != len(header_broadcast.value):
        return None
    hospital_pk = values[0]
    features = set()
    for i, value in enumerate(values[1:], start=1):
        value = value.strip()
        if not value:
            continue
        col_name = header_broadcast.value[i]
        feature = f"{col_name}:{value}"
        features.add(feature)
    return (hospital_pk, features)

features_rdd = hospitals_rdd.filter(lambda x: x != header) \
    .map(binary_features_generation) \
    .filter(lambda x: x is not None)

reduced_rdd = features_rdd.reduceByKey(lambda x, y: x.union(y))

target_hospital_pks = {"150034", "50739", "330231", "241326", "070008"}
filtered_rdd = reduced_rdd.filter(lambda x: x[0] in target_hospital_pks)

results = filtered_rdd.collect()
for hospital_pk, features in results:
    print(f"({hospital_pk}, {features})")
with open("output_task_1.txt", "w") as f:
    for hospital_pk, features in results:
        f.write(f"{hospital_pk}, {features}\n")


(070008, {'total_adult_patients_hospitalized_confirmed_and_suspected_covid_7_day_coverage:7', 'total_pediatric_patients_hospitalized_confirmed_and_suspected_covid_7_day_sum:0', 'previous_day_admission_adult_covid_confirmed_18-19_7_day_sum:0', 'total_pediatric_patients_hospitalized_confirmed_and_suspected_covid_7_day_coverage:7', 'previous_day_total_ED_visits_7_day_sum:323', 'previous_day_admission_adult_covid_suspected_7_day_coverage:6', 'address:201 CHESTNUT HILL ROAD', 'previous_day_admission_adult_covid_suspected_40-49_7_day_sum:0', 'staffed_icu_adult_patients_confirmed_covid_7_day_sum:0', 'previous_day_admission_adult_covid_confirmed_70-79_7_day_sum:0', 'all_adult_hospital_beds_7_day_sum:459', 'previous_day_admission_adult_covid_suspected_7_day_sum:-999999', 'total_adult_patients_hospitalized_confirmed_covid_7_day_sum:0', 'all_adult_hospital_inpatient_beds_7_day_sum:422', 'previous_day_admission_adult_covid_confirmed_80+_7_day_sum:0', 'previous_day_admission_pediatric_covid_suspect

Task 2.B. Minhash

Create a “signature” for each hospital: Use the efficient Minhashing approach to convert the set representation of each hospital into 100 dimensions by using hashes on the set strings. Requirement: Do not store the hashed values of every potential set element in a broadcast variable it will be too large.

In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import random

spark = SparkSession.builder \
    .appName("MinHashing Hospitals Task 2") \
    .getOrCreate()

sc = spark.sparkContext

num_hashes = 100
max_hash = 2**31 - 1
random.seed(42)
hash_functions = [
    (random.randint(1, max_hash), random.randint(1, max_hash))
    for _ in range(num_hashes)
]

hash_broadcast = sc.broadcast(hash_functions)

def compute_hashes(element, hash_functions):
    return [((a * hash(element) + b) % max_hash) for a, b in hash_functions]

def min_hash_reduce(values):
    return min(values)

hospitals_rdd = spark.sparkContext.textFile("/content/trial_COVID-19_Hospital_Impact.csv", 32)

hospital_rdd = hospitals_rdd.map(lambda line: (
    line.split(",")[0],
    line.split(",")[1:]
))

signature_rdd = hospital_rdd.flatMap(lambda x: [
    ((i, x[0]), compute_hashes(feat, hash_broadcast.value)[i])
    for feat in x[1]
    for i in range(num_hashes)
])

signature_min_rdd = signature_rdd.reduceByKey(lambda x, y: min(x, y))

hospital_signature_rdd = signature_min_rdd.map(lambda x: (x[0][1], (x[0][0], x[1]))) \
    .groupByKey() \
    .mapValues(lambda sig: [val[1] for val in sorted(sig, key=lambda t: t[0])])

hospital_pks = ["150034", "050739", "330231", "241326", "070008"]
result_task_2 = hospital_signature_rdd.filter(lambda x: x[0] in hospital_pks).collect()

for hospital_id, signature in result_task_2:
    print(f"Hospital ID: {hospital_id}")
    print(f"Signature vector: {signature}")
with open("signatures_task_2.txt", "w") as f:
    for hospital_id, signature in result_task_2:
        f.write(f"{hospital_id},{signature}\n")


Hospital ID: 070008
Signature vector: [10098328, 122102293, 146665953, 3177446, 78838577, 16604389, 96429205, 17215421, 68252794, 37113690, 52881089, 10453654, 3081159, 198198587, 196279117, 145637412, 129264407, 49294848, 56301363, 104715602, 120345011, 11944121, 5431726, 77995789, 100590622, 219494903, 20210338, 22188815, 63521322, 104746197, 93309106, 141500264, 100303012, 88592688, 45229174, 40403739, 149160513, 105422517, 31199168, 55615623, 28956278, 22936120, 13971222, 29886880, 187522122, 196995516, 23117204, 57836919, 11556581, 8007663, 125164560, 119375963, 106112241, 177173260, 48561495, 16568724, 102837489, 28376083, 35140049, 79230628, 128324738, 28893119, 97876829, 2763653, 24691984, 120125831, 442019, 1234711, 118857525, 98467984, 60127955, 21308841, 21278276, 95599903, 122891732, 176865201, 48671859, 56969213, 69243352, 346161961, 35324216, 49181513, 109773004, 78156259, 38114121, 41953466, 52995789, 39133655, 107189496, 40050716, 8470306, 39434150, 128842773, 269322195

In [3]:
hospitals_rdd = spark.sparkContext.textFile("/content/trial_COVID-19_Hospital_Impact.csv", 32)
print(hospitals_rdd.take(10))


['hospital_pk,collection_week,state,ccn,hospital_name,address,city,zip,hospital_subtype,fips_code,is_metro_micro,total_beds_7_day_avg,all_adult_hospital_beds_7_day_avg,all_adult_hospital_inpatient_beds_7_day_avg,inpatient_beds_used_7_day_avg,all_adult_hospital_inpatient_bed_occupied_7_day_avg,inpatient_beds_used_covid_7_day_avg,total_adult_patients_hospitalized_confirmed_and_suspected_covid_7_day_avg,total_adult_patients_hospitalized_confirmed_covid_7_day_avg,total_pediatric_patients_hospitalized_confirmed_and_suspected_covid_7_day_avg,total_pediatric_patients_hospitalized_confirmed_covid_7_day_avg,inpatient_beds_7_day_avg,total_icu_beds_7_day_avg,total_staffed_adult_icu_beds_7_day_avg,icu_beds_used_7_day_avg,staffed_adult_icu_bed_occupancy_7_day_avg,staffed_icu_adult_patients_confirmed_and_suspected_covid_7_day_avg,staffed_icu_adult_patients_confirmed_covid_7_day_avg,total_patients_hospitalized_confirmed_influenza_7_day_avg,icu_patients_confirmed_influenza_7_day_avg,total_patients_hos

C. Find similar pairs using LSH
Run LSH to find approximately 20 candidates that are most similar to hospitals: 150034, 50739,
330231,241326, 70008. From the perspective of LSH, each hospital is a column with each row
being a value of the signatures. Tweak bands and rows per band in order to get approximately 20
candidates (i.e. anything between 10 to 30 candidates per hospital is ok).

In [3]:
from pyspark import SparkConf, SparkContext
from itertools import combinations
from datetime import datetime
import sys

APP_NAME = 'LSH'
SPLITTER = ','
BANDS = 5
ROWS = 4
NUM_CANDIDATES = (10, 30)
TARGET_HOSPITALS = {"150034", "50739", "330231", "241326", "070008"}
DEBUG = 0
PRINT_TIME = True
INPUT_FILE = '/content/trial_COVID-19_Hospital_Impact.csv'
OUTPUT_FILE = 'output_task_3.txt'

def getInputData(filename):
    _data = []
    try:
        with open(filename, 'r') as _fp:
            first_line = True
            for _each_line in _fp:
                if first_line:
                    first_line = False
                    continue
                _row = _each_line.strip().split(SPLITTER)
                if len(_row) > 1:
                    hospital_id = _row[0]
                    try:
                        numeric_values = [
                            int(float(x)) for x in _row[1:]
                            if x.strip().replace('.', '', 1).lstrip('-').isdigit()
                        ]
                        while len(numeric_values) < 100:
                            numeric_values.append(0)
                        _data.append((hospital_id, numeric_values[:100]))
                    except ValueError as e:
                        print(f"Skipping invalid row: {_row} -> Error: {e}")
        return _data
    except IOError as _err:
        print(f'File error: {_err}')
        exit()

def setOutputData(filename, results):
    with open(filename, 'w') as f:
        for target, matches in results.items():
            f.write(f"Target Hospital: {target}\n")
            seen = set()
            for match in sorted(matches, key=lambda x: -x[2]):
                matched_hospital = match[1]
                if matched_hospital in seen:
                    continue
                seen.add(matched_hospital)
                similarity = match[2]
                signature = match[3] if len(match) > 3 else []
                f.write(f"  Matched Hospital: {matched_hospital}, Jaccard Similarity: {similarity:.4f}, Signature: {signature[:10]}\n")

def jaccard_similarity(sig1, sig2):
    intersection = sum(1 for a, b in zip(sig1, sig2) if a == b)
    union = len(sig1)
    return intersection / union

def divide_into_bands(signature, bands, rows):
    for band in range(bands):
        start = band * rows
        end = start + rows
        yield band, tuple(signature[start:end])

if __name__ == "__main__":
    if PRINT_TIME:
        print(f'LSH=>Start=>{datetime.now()}')
    conf = SparkConf().setAppName(APP_NAME).setMaster("local[*]")
    sc = SparkContext(conf=conf)

    try:
        data_list = getInputData(INPUT_FILE)
        if not data_list:
            raise ValueError("No valid data loaded from the input file.")
    except Exception as e:
        print(f"Error loading input file: {e}")
        sc.stop()
        exit()

    rdd = sc.parallelize(data_list)

    def create_bands(hospital):
        hospital_id, signature = hospital
        bands = []
        for band, band_signature in divide_into_bands(signature, BANDS, ROWS):
            bands.append(((band, hash(band_signature)), hospital_id))
        return bands

    band_rdd = rdd.flatMap(create_bands)

    def candidate_pairs_generation(bucket):
        hospitals = list(bucket)
        for i, j in combinations(hospitals, 2):
            yield (i, j)

    candidates_rdd = band_rdd.groupByKey() \
        .filter(lambda x: len(x[1]) > 1) \
        .flatMap(lambda x: candidate_pairs_generation(x[1])).distinct()

    hospital_dict = dict(rdd.collect())

    def compute_similarity(pair):
        h1, h2 = pair
        if h1 in TARGET_HOSPITALS or h2 in TARGET_HOSPITALS:
            sig1 = hospital_dict[h1]
            sig2 = hospital_dict[h2]
            similarity = jaccard_similarity(sig1, sig2)
            return h1, h2, similarity, sig2[:10]
        return None

    results_rdd = candidates_rdd.map(compute_similarity).filter(lambda x: x is not None)

    def limit_candidates(results):
        sorted_results = sorted(results, key=lambda x: -x[2])
        return sorted_results[:NUM_CANDIDATES[1]]

    grouped_results = results_rdd.map(lambda x: (x[0] if x[0] in TARGET_HOSPITALS else x[1], x)) \
        .groupByKey() \
        .mapValues(limit_candidates).collectAsMap()

    print("Top Matches for Target Hospitals:")
    for target, matches in grouped_results.items():
        print(f"Target Hospital: {target}")
        seen = set()
        for match in sorted(matches, key=lambda x: -x[2]):
            matched_hospital = match[1]
            if matched_hospital in seen:
                continue
            seen.add(matched_hospital)
            print(f"  Matched Hospital: {matched_hospital}, Jaccard Similarity: {match[2]:.4f}, Signature: {match[3]}")

    setOutputData(OUTPUT_FILE, grouped_results)

    if PRINT_TIME:
        print(f'LSH=>Finish=>{datetime.now()}')
    sc.stop()


LSH=>Start=>2024-12-17 20:17:51.698452
Top Matches for Target Hospitals:
Target Hospital: 150034
  Matched Hospital: 150034, Jaccard Similarity: 1.0000, Signature: [150034, 46342, 18089, 288, 218, 218, 158, 158, 22, 22]
  Matched Hospital: 250019, Jaccard Similarity: 0.6500, Signature: [250019, 39502, 28047, 252, 244, 218, 189, 189, 22, 22]
  Matched Hospital: 370023, Jaccard Similarity: 0.6500, Signature: [370023, 73533, 40137, 77, 77, 58, 58, 58, 22, 22]
  Matched Hospital: 420068, Jaccard Similarity: 0.6100, Signature: [420068, 29115, 45075, 207, 207, 162, 128, 128, 4, 4]
  Matched Hospital: 100254, Jaccard Similarity: 0.6000, Signature: [100254, 32308, 12073, 310, 302, 258, 202, 201, 15, 15]
  Matched Hospital: 250082, Jaccard Similarity: 0.5900, Signature: [250082, 38704, 28151, 120, 120, 120, 88, 88, 7, 7]
  Matched Hospital: 340070, Jaccard Similarity: 0.5800, Signature: [340070, 27216, 37001, 238, 175, 175, 128, 128, 23, 23]
  Matched Hospital: 010144, Jaccard Similarity: 0.520

Task 1.A. Extract binary features (i.e. a sparse representation of the characteristic
matrix) per hospital
For each record, treat the first element as the unique record id. For each of the 108 other
columns, treat them as a binary feature with an id as string of the form:
"column_name:value" and add that to a set that represents the values that are "1" in the
characteristic matrix (every element of the set not present is assumed to be 0) : using test_COVID-19_Hospital_Impact.csv

In [2]:
from pyspark.sql import SparkSession


spark = SparkSession.builder.appName("BinaryFeatureExtraction").getOrCreate()

hospitals_rdd = spark.sparkContext.textFile("/content/test_COVID-19_Hospital_Impact.csv", 32)

header = hospitals_rdd.first()
header_list = header.split(",")
header_broadcast = spark.sparkContext.broadcast(header_list)

def binary_features_generation(row):
    values = row.split(",")
    if len(values) != len(header_broadcast.value):
        return None
    hospital_pk = values[0]
    features = set()
    for i, value in enumerate(values[1:], start=1):
        value = value.strip()
        if not value:
            continue
        col_name = header_broadcast.value[i]
        feature = f"{col_name}:{value}"
        features.add(feature)
    return (hospital_pk, features)

features_rdd = hospitals_rdd.filter(lambda x: x != header) \
    .map(binary_features_generation) \
    .filter(lambda x: x is not None)

reduced_rdd = features_rdd.reduceByKey(lambda x, y: x.union(y))

target_hospital_pks = {"150034", "50739", "330231", "241326", "070008"}
filtered_rdd = reduced_rdd.filter(lambda x: x[0] in target_hospital_pks)

results = filtered_rdd.collect()
for hospital_pk, features in results:
    print(f"({hospital_pk}, {features})")
with open("output_task.txt", "w") as f:
    for hospital_pk, features in results:
        f.write(f"{hospital_pk}, {features}\n")


(070008, {'total_adult_patients_hospitalized_confirmed_and_suspected_covid_7_day_sum:41', 'inpatient_beds_used_7_day_avg:30.1', 'total_adult_patients_hospitalized_confirmed_and_suspected_covid_7_day_sum:23', 'previous_day_admission_adult_covid_suspected_7_day_sum:13', 'total_adult_patients_hospitalized_confirmed_covid_7_day_sum:70', 'previous_day_admission_adult_covid_suspected_unknown_7_day_sum:7', 'staffed_icu_adult_patients_confirmed_covid_7_day_sum:9', 'total_adult_patients_hospitalized_confirmed_covid_7_day_sum:34', 'staffed_icu_adult_patients_confirmed_and_suspected_covid_7_day_sum:21', 'hospital_name:JOHNSON MEMORIAL HOSPITAL', 'staffed_icu_adult_patients_confirmed_and_suspected_covid_7_day_sum:30', 'inpatient_beds_used_7_day_avg:35.3', 'collection_week:2022/02/25', 'previous_day_covid_ED_visits_7_day_sum:184', 'all_adult_hospital_inpatient_bed_occupied_7_day_sum:240', 'all_adult_hospital_inpatient_bed_occupied_7_day_avg:27.3', 'inpatient_beds_used_covid_7_day_sum:32', 'inpatien

Task 2.B. Minhash

Create a “signature” for each hospital: Use the efficient Minhashing approach to convert the set representation of each hospital into 100 dimensions by using hashes on the set strings. Requirement: Do not store the hashed values of every potential set element in a broadcast variable it will be too large.

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
import random

spark = SparkSession.builder \
    .appName("MinHashing Hospitals Task 2") \
    .getOrCreate()

sc = spark.sparkContext

num_hashes = 100
max_hash = 2**31 - 1
random.seed(42)
hash_functions = [
    (random.randint(1, max_hash), random.randint(1, max_hash))
    for _ in range(num_hashes)
]

hash_broadcast = sc.broadcast(hash_functions)

def compute_hashes(element, hash_functions):
    return [((a * hash(element) + b) % max_hash) for a, b in hash_functions]

def min_hash_reduce(values):
    return min(values)

hospitals_rdd = spark.sparkContext.textFile("/content/test_COVID-19_Hospital_Impact.csv", 32)

hospital_rdd = hospitals_rdd.map(lambda line: (
    line.split(",")[0],
    line.split(",")[1:]
))

signature_rdd = hospital_rdd.flatMap(lambda x: [
    ((i, x[0]), compute_hashes(feat, hash_broadcast.value)[i])
    for feat in x[1]
    for i in range(num_hashes)
])

signature_min_rdd = signature_rdd.reduceByKey(lambda x, y: min(x, y))

hospital_signature_rdd = signature_min_rdd.map(lambda x: (x[0][1], (x[0][0], x[1]))) \
    .groupByKey() \
    .mapValues(lambda sig: [val[1] for val in sorted(sig, key=lambda t: t[0])])

hospital_pks = ["150034", "050739", "330231", "241326", "070008"]
result_task_3 = hospital_signature_rdd.filter(lambda x: x[0] in hospital_pks).collect()

for hospital_id, signature in result_task_2:
    print(f"Hospital ID: {hospital_id}")
    print(f"Signature vector: {signature}")
with open("signatures_task.txt", "w") as f:
    for hospital_id, signature in result_task_3:
        f.write(f"{hospital_id},{signature}\n")


C. Find similar pairs using LSH
Run LSH to find approximately 20 candidates that are most similar to hospitals: 150034, 50739,
330231,241326, 70008. From the perspective of LSH, each hospital is a column with each row
being a value of the signatures. Tweak bands and rows per band in order to get approximately 20
candidates (i.e. anything between 10 to 30 candidates per hospital is ok).

In [None]:
from pyspark import SparkConf, SparkContext
from itertools import combinations
from datetime import datetime
import sys

APP_NAME = 'LSH'
SPLITTER = ','
BANDS = 5
ROWS = 4
NUM_CANDIDATES = (10, 30)
TARGET_HOSPITALS = {"150034", "50739", "330231", "241326", "070008"}
DEBUG = 0
PRINT_TIME = True
INPUT_FILE = '/content/test_COVID-19_Hospital_Impact.csv'
OUTPUT_FILE = 'output_task.txt'

def getInputData(filename):
    _data = []
    try:
        with open(filename, 'r') as _fp:
            first_line = True
            for _each_line in _fp:
                if first_line:
                    first_line = False
                    continue
                _row = _each_line.strip().split(SPLITTER)
                if len(_row) > 1:
                    hospital_id = _row[0]
                    try:
                        numeric_values = [
                            int(float(x)) for x in _row[1:]
                            if x.strip().replace('.', '', 1).lstrip('-').isdigit()
                        ]
                        while len(numeric_values) < 100:
                            numeric_values.append(0)
                        _data.append((hospital_id, numeric_values[:100]))
                    except ValueError as e:
                        print(f"Skipping invalid row: {_row} -> Error: {e}")
        return _data
    except IOError as _err:
        print(f'File error: {_err}')
        exit()

def setOutputData(filename, results):
    with open(filename, 'w') as f:
        for target, matches in results.items():
            f.write(f"Target Hospital: {target}\n")
            seen = set()
            for match in sorted(matches, key=lambda x: -x[2]):
                matched_hospital = match[1]
                if matched_hospital in seen:
                    continue
                seen.add(matched_hospital)
                similarity = match[2]
                signature = match[3] if len(match) > 3 else []
                f.write(f"  Matched Hospital: {matched_hospital}, Jaccard Similarity: {similarity:.4f}, Signature: {signature[:10]}\n")

def jaccard_similarity(sig1, sig2):
    intersection = sum(1 for a, b in zip(sig1, sig2) if a == b)
    union = len(sig1)
    return intersection / union

def divide_into_bands(signature, bands, rows):
    for band in range(bands):
        start = band * rows
        end = start + rows
        yield band, tuple(signature[start:end])

if __name__ == "__main__":
    if PRINT_TIME:
        print(f'LSH=>Start=>{datetime.now()}')
    conf = SparkConf().setAppName(APP_NAME).setMaster("local[*]")
    sc = SparkContext(conf=conf)

    try:
        data_list = getInputData(INPUT_FILE)
        if not data_list:
            raise ValueError("No valid data loaded from the input file.")
    except Exception as e:
        print(f"Error loading input file: {e}")
        sc.stop()
        exit()

    rdd = sc.parallelize(data_list)

    def create_bands(hospital):
        hospital_id, signature = hospital
        bands = []
        for band, band_signature in divide_into_bands(signature, BANDS, ROWS):
            bands.append(((band, hash(band_signature)), hospital_id))
        return bands

    band_rdd = rdd.flatMap(create_bands)

    def candidate_pairs_generation(bucket):
        hospitals = list(bucket)
        for i, j in combinations(hospitals, 2):
            yield (i, j)

    candidates_rdd = band_rdd.groupByKey() \
        .filter(lambda x: len(x[1]) > 1) \
        .flatMap(lambda x: candidate_pairs_generation(x[1])).distinct()

    hospital_dict = dict(rdd.collect())

    def compute_similarity(pair):
        h1, h2 = pair
        if h1 in TARGET_HOSPITALS or h2 in TARGET_HOSPITALS:
            sig1 = hospital_dict[h1]
            sig2 = hospital_dict[h2]
            similarity = jaccard_similarity(sig1, sig2)
            return h1, h2, similarity, sig2[:10]
        return None

    results_rdd = candidates_rdd.map(compute_similarity).filter(lambda x: x is not None)

    def limit_candidates(results):
        sorted_results = sorted(results, key=lambda x: -x[2])
        return sorted_results[:NUM_CANDIDATES[1]]

    grouped_results = results_rdd.map(lambda x: (x[0] if x[0] in TARGET_HOSPITALS else x[1], x)) \
        .groupByKey() \
        .mapValues(limit_candidates).collectAsMap()

    print("Top Matches for Target Hospitals:")
    for target, matches in grouped_results.items():
        print(f"Target Hospital: {target}")
        seen = set()
        for match in sorted(matches, key=lambda x: -x[2]):
            matched_hospital = match[1]
            if matched_hospital in seen:
                continue
            seen.add(matched_hospital)
            print(f"  Matched Hospital: {matched_hospital}, Jaccard Similarity: {match[2]:.4f}, Signature: {match[3]}")

    setOutputData(OUTPUT_FILE, grouped_results)

    if PRINT_TIME:
        print(f'LSH=>Finish=>{datetime.now()}')
    sc.stop()
