'''
<br><br>

@Author: Shivraj Yelave<br>
@Date: 03-09-24<br>
@Last modified by: Shivraj Yelave<br>
@Last modified date: 04-09-24<br>
@Title: Covid problems using pyspark <br>
<br><br>

'''

## 1. To find out the death percentage locally and globally

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Initialize Spark Session
spark = SparkSession.builder.appName("DeathPercentage").getOrCreate()

# Read CSV file into DataFrame
data = spark.read.csv('file:///C:/Users/Admin/Documents/pyspark/PYSPARK/country_wise_latest.csv', header=True, inferSchema=True)

# Calculate the death percentage for each country
data = data.withColumn("Death Percentage", col("Deaths") / col("Confirmed") * 100)

# Filter the DataFrame for a specific country (e.g., India)
local_death_percentage = data.filter(col("Country/Region") == 'India').select("Death Percentage")
local_death_percentage.show()

# Calculate the global death percentage
# Use sum as _sum to avoid conflict with column name
global_death_percentage = data.select(
    (sum(col("Deaths")) / sum(col("Confirmed")) * 100).alias("Global Death Percentage")
)
global_death_percentage.show()

# Stop session
spark.stop()


+------------------+
|  Death Percentage|
+------------------+
|2.2571859631247917|
+------------------+

+-----------------------+
|Global Death Percentage|
+-----------------------+
|      3.968548255709708|
+-----------------------+



## 2.To find out the infected population percentage locally and globally

In [49]:
#import modules
from pyspark.sql import SparkSession  # Import SparkSession to create a Spark application
from pyspark.sql.functions import col, count, sum, avg, max, min, round  # Import functions for DataFrame operations

#create session
spark = SparkSession.builder.appName("question2").getOrCreate()  # Create a Spark session with the name "question2"

#load data
data = spark.read.csv("file:///C:/Users/Admin/Documents/pyspark/PYSPARK/worldometer_data.csv", header=True, inferSchema=True)  # Load CSV data into a DataFrame with header and inferred schema

# Calculate percentage of local infections in India
infected_locally = data.filter(col('Country/Region') == 'India').select(['Country/Region', (col('TotalCases') / col('Population') * 100).alias('Local Infected Percent')]).withColumn('Local Infected Percent', round(col('Local Infected Percent'), 4))  # Round the percentage to 4 decimal places

infected_locally.show()  # Display the results for local infections in India

# Calculate percentage of global infections
infected_globally = data.select([(sum(col('TotalCases')) / sum(col('Population')) * 100).alias('Global Infected Percent')]).withColumn('Global Infected Percent', round(col('Global Infected Percent'), 4))  # Round the percentage to 4 decimal places

infected_globally.show()  # Display the results for global infections

spark.stop()  # Stop the Spark session to release resources


+--------------+----------------------+
|Country/Region|Local Infected Percent|
+--------------+----------------------+
|         India|                0.1466|
+--------------+----------------------+

+-----------------------+
|Global Infected Percent|
+-----------------------+
|                  0.303|
+-----------------------+



## 3. To find out the countries with the highest infection rates

In [59]:
from pyspark.sql import SparkSession  # Import SparkSession to create a Spark application

spark = SparkSession.builder.appName("question 3").getOrCreate()  # Create a Spark session with the name "question 3"

data = spark.read.csv("file:///C:/Users/Admin/Documents/pyspark/PYSPARK/worldometer_data.csv", header=True, inferSchema=True)  # Load CSV data into a DataFrame with header and inferred schema

# Select relevant columns and calculate the percentage of infected cases
required_data = data.select([  # Select columns from the DataFrame
    col('Country/Region').alias("Countries"),  # Rename 'Country/Region' column to 'Countries'
    (col('TotalCases') / col('Population') * 100).alias('Infected Percent')  # Calculate infected percentage and alias it as 'Infected Percent'
]).orderBy(col('Infected Percent'), ascending=False)  # Order the DataFrame by 'Infected Percent' in descending order

required_data = required_data.withColumn('Infected Percent', round(col('Infected Percent'), 3))  # Round the 'Infected Percent' to 3 decimal places

required_data = required_data.limit(10)  # Limit the result to the top 10 rows

required_data.show()  # Display the DataFrame showing the top 10 countries with the highest infection percentages

spark.stop()  # Stop the Spark session to release resources


+-------------+----------------+
|    Countries|Infected Percent|
+-------------+----------------+
|        Qatar|           3.992|
|French Guiana|           2.715|
|      Bahrain|           2.513|
|   San Marino|            2.06|
|        Chile|           1.916|
|       Panama|           1.653|
|       Kuwait|           1.638|
|         Oman|           1.577|
|          USA|           1.519|
| Vatican City|           1.498|
+-------------+----------------+



## 4. To find out the countries and continents with the highest death counts

In [65]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum

# Create a Spark session
spark = SparkSession.builder.appName('question 4').getOrCreate()

# Load the CSV data into a DataFrame
data = spark.read.csv('file:///C:/Users/Admin/Documents/pyspark/PYSPARK/worldometer_data.csv', header=True, inferSchema=True)

# Select the top 10 countries with the highest number of deaths
required_data = data.select([col('Country/Region').alias('Countries'), col('TotalDeaths').alias('Deaths')]) \
                    .orderBy(col('Deaths'), ascending=False) \
                    .limit(10)

# Show the result for countries with the most deaths
required_data.show()

# Group by WHO Region (Continents) and sum the total deaths, then order by deaths in descending order
required_data2 = data.groupBy(col('WHO Region').alias('Continents')) \
                    .agg(sum(col('TotalDeaths')).alias('Deaths')) \
                    .orderBy(col('Deaths'), ascending=False) \
                    .limit(10)

# Show the result for continents with the most deaths
required_data2.na.drop().show()

# Stop the Spark session
spark.stop()


+---------+------+
|Countries|Deaths|
+---------+------+
|      USA|162804|
|   Brazil| 98644|
|   Mexico| 50517|
|       UK| 46413|
|    India| 41638|
|    Italy| 35187|
|   France| 30312|
|    Spain| 28500|
|     Peru| 20424|
|     Iran| 17976|
+---------+------+

+--------------------+------+
|          Continents|Deaths|
+--------------------+------+
|            Americas|384637|
|              Europe|215564|
|      South-EastAsia| 50624|
|EasternMediterranean| 42376|
|              Africa| 15538|
|      WesternPacific|  3975|
+--------------------+------+



## 5. Average number of deaths by day (Continents and Countries)

In [70]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, avg

# Create a Spark session
spark = SparkSession.builder.appName('question 5').getOrCreate()

# Load the CSV data into a DataFrame
data = spark.read.csv('file:///C:/Users/Admin/Documents/pyspark/PYSPARK/covid_19_clean_complete.csv', header=True, inferSchema=True)

# Group the data by Date and 'WHO Region' (or Continent) and calculate the average number of deaths by day
avg_deaths_by_continent = data.groupBy(col('Date'), col('WHO Region')) \
    .agg(avg(col('Deaths')).alias('Avg Deaths')) \
    .orderBy(col('Date'), ascending=True)

avg_deaths_by_continent.show()

# Group the data by Date and 'Country/Region' and calculate the average number of deaths by day
avg_deaths_by_country = data.groupBy(col('Date'), col('Country/Region')) \
    .agg(avg(col('Deaths')).alias('Avg Deaths')) \
    .orderBy(col('Date'), ascending=True)

avg_deaths_by_country.show()

# Stop the Spark session
spark.stop()


+----------+--------------------+--------------------+
|      Date|          WHO Region|          Avg Deaths|
+----------+--------------------+--------------------+
|01-02-2020|              Europe|                 0.0|
|01-02-2020|Eastern Mediterra...|                 0.0|
|01-02-2020|            Americas|                 0.0|
|01-02-2020|     Western Pacific|   4.709090909090909|
|01-02-2020|              Africa|                 0.0|
|01-02-2020|     South-East Asia|                 0.0|
|01-03-2020|     Western Pacific|   52.69090909090909|
|01-03-2020|            Americas|0.021739130434782608|
|01-03-2020|Eastern Mediterra...|  2.4545454545454546|
|01-03-2020|              Africa|                 0.0|
|01-03-2020|              Europe|                0.45|
|01-03-2020|     South-East Asia|                 0.1|
|01-04-2020|              Europe|             438.725|
|01-04-2020|     South-East Asia|                23.7|
|01-04-2020|              Africa|              2.6875|
|01-04-202

## 6. Average of cases divided by the number of population of each country (TOP 10)

In [71]:
#import modules
from pyspark.sql import SparkSession  # Import SparkSession to create a Spark application
from pyspark.sql.functions import col, count, sum, avg, max, min, round  # Import functions for DataFrame operations

#create session
spark = SparkSession.builder.appName("question2").getOrCreate()  # Create a Spark session with the name "question2"

#load data
data = spark.read.csv("file:///C:/Users/Admin/Documents/pyspark/PYSPARK/worldometer_data.csv", header=True, inferSchema=True)  # Load CSV data into a DataFrame with header and inferred schema

# Calculate percentage of local infections in India
infected_locally = data.filter(col('Country/Region') == 'India').select(['Country/Region', (col('TotalCases') / col('Population') * 100).alias('Local Infected Percent')]).withColumn('Local Infected Percent', round(col('Local Infected Percent'), 4))  # Round the percentage to 4 decimal places

infected_locally.show()  # Display the results for local infections in India

# Calculate percentage of global infections
infected_globally = data.select([(sum(col('TotalCases')) / sum(col('Population')) * 100).alias('Global Infected Percent')]).withColumn('Global Infected Percent', round(col('Global Infected Percent'), 4))  # Round the percentage to 4 decimal places

infected_globally.show()  # Display the results for global infections

spark.stop()  # Stop the Spark session to release resources


+--------------+----------------------+
|Country/Region|Local Infected Percent|
+--------------+----------------------+
|         India|                0.1466|
+--------------+----------------------+

+-----------------------+
|Global Infected Percent|
+-----------------------+
|                  0.303|
+-----------------------+

