In [1]:
# TASK 1- Spark-RDDs: Scalable Covid19-related Applications


from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
from io import StringIO
import pandas as pd
from pyspark.sql.functions import col, expr
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from faker import Faker
import random
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import csv
import random
import string

# Creating a SparkConf with memory configurations to handle the big data
conf = SparkConf().setAppName("Covid").set("spark.executor.memory", "4g").set("spark.driver.memory", "4g").set("spark.network.timeout", "600s")

# Creating a SparkContext
spark = SparkContext(conf=conf)




# STEP 1 - DATA CREATION

def generate_concert_data(name,size):
    fake = Faker()
    data = []
    used_coordinates = set()

    for id in range(1, size+1):
        # Generate coordiates of the person X and Y values
        x = round(random.uniform(1, 10000))
        y = round(random.uniform(1, 10000))

        # Ensure uniqueness by checking and regenerating if necessary
        while (x, y) in used_coordinates:
            x = round(random.uniform(1, 10000))
            y = round(random.uniform(1, 10000))

        # Add the coordinates to the used set
        used_coordinates.add((x, y))

        data.append({
            "ID": id,
            "X": x,
            "Y": y,
            "Name": fake.name(),
            "Age": random.randint(10, 100)
        })        
    pd.DataFrame(data).to_csv(f"{name}.csv", index=False)
generate_concert_data('PEOPLE_large', 100000)




# Storing the created people data in csv file
people_large=pd.read_csv('PEOPLE_large.csv')


# Creating and storing the data for infected small dataset
infected_small=people_large.sample(5000).reset_index(drop=True)
infected_small.to_csv('INFECTED_small.csv',index=False)


# Creating and storing the data for People some infected large dataset
some_infected=pd.read_csv('PEOPLE_large.csv')
some_infected['INFECTED']=['Yes' if i in list(infected_small['ID']) else 'No' for i in some_infected['ID']]
some_infected=some_infected.reset_index(drop=True)
some_infected.to_csv('SOME_INFECTED_large.csv')

# Storing the data as RDDs
infected_rdd = spark.textFile("INFECTED_small.csv")
person_rdd=spark.textFile("PEOPLE_large.csv")



In [2]:
# STEP 2 - QUERIES


# Q1 - People from PEOPLE-large that had close contact with an infected person from Infected

def csv_(line):
    return line.split(',')

# Calculating the distance between 2 people
def distance_between(point1, point2):
    x1, y1 = point1[1], point1[2]
    x2, y2 = point2[1], point2[2]
    return  (((x1 - x2) ** 2 + (y1 - y2) ** 2) ** 0.5)

infected_q1 = infected_rdd.map(csv_).filter(lambda x: x[0] != 'ID')
people_q1 = person_rdd.map(csv_).filter(lambda x: x[0] != 'ID')

infected_coordinates = infected_q1.map(lambda x: (x[0], float(x[1]), float(x[2]),x[3],x[4]))
people_coordinates = people_q1.map(lambda x: ((x[0]), float(x[1]), float(x[2]),x[3],x[4]))

infected_id=infected_coordinates.map(lambda x:x[0]).collect()
people_coordinates=people_coordinates.filter(lambda x:x[0] not in infected_id)

close_contacts=infected_coordinates.cartesian(people_coordinates)
close_contacts=close_contacts.filter(lambda pair: distance_between(pair[0], pair[1]) <= 6) \
    .map(lambda pair: (pair[1][0], pair[0][0]))

# Printing output of query 1 to a text file
#print(close_contacts.collect())

# Collecting the results to the driver
result_list = close_contacts.collect()

output_file_path_q1 = "Query_1_Output.txt"

# Writing output to the text file
with open(output_file_path_q1, "w") as file:
    for pair in result_list:
        file.write(f"{pair[0]}, {pair[1]}\n")

In [3]:
# Q2 - Distinct people of Q1 case 

infected_q2 = infected_rdd.map(csv_).filter(lambda x: x[0] != 'ID')
people_q2 = person_rdd.map(csv_).filter(lambda x: x[0] != 'ID')

infected_coordinates = infected_q2.map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4]))
people_coordinates = people_q2.map(lambda x: (x[0], float(x[1]), float(x[2]), x[3], x[4]))

infected_id = infected_coordinates.map(lambda x: x[0]).collect()
people_coordinates = people_coordinates.filter(lambda x: x[0] not in infected_id)

distinct_close_contacts = infected_coordinates.cartesian(people_coordinates)

distinct_close_contacts = distinct_close_contacts.filter(lambda pair: distance_between(pair[0], pair[1]) <= 6) \
    .map(lambda pair: (pair[1][0], pair[0][0])) \
    .map(lambda x: (x[0], x)) \
    .reduceByKey(lambda x, y: x) \
    .map(lambda x: x[1])

# Printing output of query 2 to a text file
# print(distinct_close_contacts.collect())

# Collecting the results to the driver
distinct_result_list = distinct_close_contacts.collect()

output_file_path_q2 = "Query_2_Output.txt"

# Writing output to the text file
with open(output_file_path_q2, "w") as file:
    for pair in distinct_result_list:
        file.write(f"{pair[0]}\n")

In [4]:
# Q3 - Infected People with number of people they were in close contact with


some_infected_rdd = spark.textFile("SOME_INFECTED_large.csv")
some_infected_rdd = some_infected_rdd.map(csv_).filter(lambda x: x[0] != 'ID').filter(lambda x: x[0] != '')
some_infected_rdd = some_infected_rdd.map(lambda x: (x[1], float(x[2]), float(x[3]),x[4],x[5],x[6]))

people_infected_yes=some_infected_rdd.filter(lambda x:x[5]=='Yes')
people_infected_no=some_infected_rdd.filter(lambda x:x[5]=='No')

close_contacts_cross=people_infected_yes.cartesian(people_infected_no)

close_contacts_cross=close_contacts_cross.filter(lambda pair: distance_between(pair[0], pair[1]) <= 6) \
    .map(lambda pair: (pair[1][0],pair[1][3], pair[0][0],pair[0][3]))

infected_counts = close_contacts_cross.map(lambda x: (x[2], 1)).reduceByKey(lambda x, y: x + y).collect()



# Printing output of query 3 to a text file
# print(infected_counts)

# Collecting the results to the driver
result_count = infected_counts

output_file_path_q3 = "Query_3_Output.txt"

# Writing output to the text file
with open(output_file_path_q3, "w") as file:
    for pair in result_count:
        file.write(f"{pair[0]}, {pair[1]}\n")

In [5]:
# Ending the Spark session
spark.stop()