In [None]:
from pyspark.sql import SparkSession

try:
    # Create a Spark session
    spark = SparkSession.builder.appName("ParquetReader").getOrCreate()

    # Continue with the rest of your script...
    # ...

except Exception as e:
    print("Error:", str(e))
    # Print additional diagnostic information
    print("Check Spark installation, resource availability, port availability, and firewall settings.")
    # Print Spark configuration for additional insights
    print("Spark Configuration:")
    print(spark.sparkContext.getConf().getAll())

    # Stop the Spark session if it was partially created
    spark.stop()


In [None]:

#Specify the paths to the Parquet files
covid_data_path = "data/covid_dataset.parquet"
hospital_data_path = "data/hospital_data/hospital_data.parquet"

# Read the parquet file into the dataframes
hospital_df = spark.read.parquet(hospital_data_path)

hospital_df.show(10)
#hospital_df.schema

In [None]:

covid_data_path = "data/covid_dataset.parquet"
# Read the parquet file into the dataframes
covid_df = spark.read.parquet(covid_data_path)

covid_df.show(10)


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Create a Spark session

joined_df = covid_df.join(hospital_df,'City')

patient_count = covid_df.groupBy('City').agg(F.count('*').alias('p_count'))
#patient_count.show(10)

merged_df= joined_df.join(patient_count, 'City')
merged_df.show(10)

In [None]:
#Write this dataframe into the csv file for backup
merged_df.write.csv('data/merged_data/joined.csv',header = True, mode= 'overwrite')

In [None]:
poor_patient = joined_df.groupBy('City').agg(
    (F.sum('COVID_Beds_Available') - F.count('*')).alias('Poor_Patients')
)

poor_patient

In [None]:
covid_bed_citywise = merged_df.groupBy('City').agg(F.sum('COVID_Beds_Available').alias('Covid_beds_citywise'))
new_df = covid_bed_citywise.join(merged_df, 'City', 'inner')
new_df.show(10)

In [None]:
# Find the patient which didn't get the beds

pune_df = new_df.filter(new_df.City == 'Pune')
#pune_df.show(10)

unique_df = pune_df.dropDuplicates(['Hospital_Name'])

#unique_df.show()
# distinct_hospital_name = pune_df.select('Hospital_Name').distinct()

# pune_new_df = pune_df.join(distinct_hospital_name,'Hospital_Name')
# pune_new_df.count()



result_df = unique_df.withColumn( "patient_no_beds", F.col("Covid_beds_citywise")- F.col("p_count"))
                                          
result_df.show(10)


In [None]:
#Write this result dataframe into the csv file for backup
result_df.write.csv('data/result_data/result.csv',header = True, mode= 'overwrite')