In [1]:
import pyspark
import requests
import datetime
import concurrent.futures
import glob
import pandas as pd
from pyspark.sql import SparkSession 
import json 

spark = SparkSession.\
        builder.\
        appName("pyspark-notebook").\
        master("spark://spark-master:7077").\
        config("spark.executor.memory", "512m").\
        getOrCreate()



In [2]:
def main(noise, joined_columns):
    noise = noise
    hashed_column = 'id'
    start = datetime.datetime.now()
    
    request = requests.get(f'http://cluster-a:9200//take_data/{hashed_column}/{noise}')
    url_content = request.content
    csv_file = open("/opt/workspace/a_download.csv", 'wb')
    csv_file.write(url_content)
    csv_file.close()

    request = requests.get(f'http://cluster-b:9300//take_data/{hashed_column}/{noise}')
    url_content = request.content
    csv_file = open("/opt/workspace/b_download.csv", 'wb')
    csv_file.write(url_content)
    csv_file.close()
    
    download_time = datetime.datetime.now() - start
    
    df_1 = spark.read.csv(path="/opt/workspace/a_download.csv", sep=",", header=True)
    df_2 = spark.read.csv(path="/opt/workspace/b_download.csv", sep=",", header=True)

    df_1.createOrReplaceTempView("a_cluster_data")
    df_2.createOrReplaceTempView("b_cluster_data")

    start = datetime.datetime.now()
    
    if joined_columns == 1:
        
        sql_result_1 = spark.sql("    SELECT DISTINCT a.name, a.surname\
                                      FROM a_cluster_data, b_cluster_data\
                                      JOIN a_cluster_data a ON a.name == b_cluster_data.name\
                                  ")
        
        sql_result_2 = spark.sql("    SELECT DISTINCT b.name, b.surname\
                                      FROM a_cluster_data, b_cluster_data\
                                      JOIN b_cluster_data b ON b.name == a_cluster_data.name\
                                  ")
        
    elif joined_columns == 2:

        sql_result_1 = spark.sql("  SELECT DISTINCT a.name, a.surname\
                                    FROM a_cluster_data, b_cluster_data\
                                    JOIN a_cluster_data a \
                                    ON a.name == b_cluster_data.name and a.surname == b_cluster_data.surname\
                                  ")
        
        sql_result_2 = spark.sql("  SELECT DISTINCT b.name, b.surname\
                                    FROM a_cluster_data, b_cluster_data\
                                    JOIN b_cluster_data b \
                                    ON b.name == a_cluster_data.name and b.surname == a_cluster_data.surname\
                                  ")     
        
    sql_result_1.repartition(1).write.mode('overwrite').csv("/opt/workspace//a_joined_result", header=True)
    sql_result_2.repartition(1).write.mode('overwrite').csv("/opt/workspace//b_joined_result", header=True)

    joined_time = datetime.datetime.now() - start


    URLS = ['http://cluster-a:9200//take_data/', 
            'http://cluster-b:9300//take_data/']
    
    def send_data(url, myfiles):
        myfiles =  {'file': open(myfiles,'rb')}  
        requests.post(f'{url}', files = myfiles)
    
    myfiles = list()
    directoryPath= '/opt/workspace/a_joined_result/'
    for file_name in glob.glob(directoryPath+'*.csv'):
        myfiles.append(file_name)
    
    directoryPath= '/opt/workspace/b_joined_result/'
    for file_name in glob.glob(directoryPath+'*.csv'):
        myfiles.append(file_name)
        
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        task = {executor.submit(send_data, url, myfile): url for url,myfile in zip(URLS, myfiles)}

        for future in concurrent.futures.as_completed(task):
            url = task[future]
            try:
                data = future.result()
            except Exception as exc:
                print('%r generated an exception: %s' % (url, exc))
  

    result = requests.get(f"http://cluster-b:9300//return_statistics")
    a_cluster_result = json.loads(result.content)
    
 
    result = requests.get(f"http://cluster-a:9200//return_statistics")
    b_cluster_result = json.loads(result.content)
    
 
    TP_1 = int(a_cluster_result['TP'])
    TP_2 = int(b_cluster_result['TP'])
    

    
    FP_1 = int(a_cluster_result['FP'])
    FP_2 = int(a_cluster_result['FP'])
    
    FN_1 = int(a_cluster_result['FN'])
    FN_2 = int(a_cluster_result['FN'])
    
    
    
    precision = (TP_1 + TP_2) / (  TP_1 + TP_2 + FP_1 + FP_2 )
    recall    = (TP_1 + TP_2) / (  TP_1 + TP_2 + FN_1 + FN_2 )
    
    return pd.DataFrame([[noise, download_time, joined_time ,precision, recall, joined_columns]], columns=['noise', 'download_time', 'joined_time', 'precision', 'recall', 'joined'])
    

In [5]:
result = pd.DataFrame(None, columns=['noise', 'download_time', 'joined_time', 'precision', 'recall', 'joined'])
for y in range (1,3):
    for x in range (0,300,10):
        result = pd.concat([result, main(x,y)], axis=0)


In [48]:
result.to_csv('final_data.csv', encoding='utf-8', header=True, index=False)

In [6]:
print(result)

  noise          download_time            joined_time  precision    recall  \
0     0 0 days 00:00:00.122239 0 days 00:00:07.453532   1.000000  1.000000   
0    10 0 days 00:00:00.076123 0 days 00:00:07.418078   0.995280  0.010960   
0    20 0 days 00:00:00.122202 0 days 00:00:08.492670   0.976892  0.005359   
0    30 0 days 00:00:00.123766 0 days 00:00:09.169057   0.972618  0.003542   
0    40 0 days 00:00:00.100297 0 days 00:00:08.414164   0.948632  0.002597   
0    50 0 days 00:00:00.106400 0 days 00:00:13.470666   0.925081  0.002028   
0    60 0 days 00:00:00.092818 0 days 00:00:10.562038   0.915099  0.001660   
0    70 0 days 00:00:00.105570 0 days 00:00:12.099529   0.883505  0.001368   
0    80 0 days 00:00:00.197422 0 days 00:00:14.597699   0.870117  0.001176   
0    90 0 days 00:00:00.111069 0 days 00:00:11.046828   0.838207  0.001007   
0   100 0 days 00:00:00.121358 0 days 00:00:13.440520   0.812677  0.000882   
0   110 0 days 00:00:00.119765 0 days 00:00:18.058171   0.786569