In [1]:
from http.server import BaseHTTPRequestHandler, HTTPServer
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
from glob import glob
import json

class HTTPRequestHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        if self.path == '/least_affected_country':
            # Create a SparkSession
            spark = SparkSession.builder \
                .appName("Least Affected Country") \
                .getOrCreate()

            # Define the directory containing the CSV files
            directory = "/Users/saumya/Documents/spark-project/"

            # Find all CSV files in the directory
            file_paths = glob(os.path.join(directory, "*.csv"))

            # Read each CSV file into a DataFrame, calculate the death/case ratio, and append to a list
            dfs = []
            for file_path in file_paths:
                df = spark.read.csv(file_path, header=True, inferSchema=True)
                df = df.withColumn("Death_Case_Ratio", col("deaths") / col("cases"))
                dfs.append(df)

            # Merge all the DataFrames
            merged_df = dfs[0]
            for df in dfs[1:]:
                merged_df = merged_df.union(df)

            # Find the country with the lowest death/case ratio (least affected)
            least_affected_country = merged_df.orderBy(col("Death_Case_Ratio")).select("country").first()[0]

            # Stop the SparkSession
            spark.stop()

            # Send response
            self.send_response(200)
            self.send_header('Content-type', 'application/json')
            self.end_headers()
            response = {'least_affected_country': least_affected_country}
            self.wfile.write(json.dumps(response).encode())

        else:
            # Send 404 response for other paths
            self.send_response(404)
            self.end_headers()
            self.wfile.write(b'404 Not Found')

def run_server(port=8001):
    server_address = ('', port)
    httpd = HTTPServer(server_address, HTTPRequestHandler)
    print(f'Starting server on port {port}...')
    httpd.serve_forever()

if __name__ == '__main__':
    run_server()


Starting server on port 8001...


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/05/02 15:25:38 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/05/02 15:25:38 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/05/02 15:25:38 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
127.0.0.1 - - [02/May/2024 15:25:41] "GET /least_affected_country HTTP/1.1" 200 -
127.0.0.1 - - [02/May/2024 15:25:41] "GET /favicon.ico HTTP/1.1" 404 -
