In [None]:
!pip install pyspark

In [None]:
!pip install findspark

In [None]:
!pip install pandas

In [None]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, DateType
import pandas as pd

spark = SparkSession \
    .builder \
    .appName("COVID-19 Data Analysis") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

if 'spark' in locals() and isinstance(spark, SparkSession):
    print("SparkSession is active and ready to use.")
else:
    print("SparkSession is not active. Please create a SparkSession.")

In [None]:
vaccination_data = pd.read_csv('https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv')

In [None]:
print("Displaying the first 5 records of the vaccination data:")
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
print(vaccination_data[columns_to_display].head())

In [None]:
schema = StructType([
    StructField("continent", StringType(), True),
    StructField("total_cases", LongType(), True),
    StructField("total_deaths", LongType(), True),
    StructField("total_vaccinations", LongType(), True),
    StructField("population", LongType(), True)
])
vaccination_data['continent'] = vaccination_data['continent'].astype(str)
vaccination_data['total_cases'] = vaccination_data['total_cases'].fillna(0).astype('int64')
vaccination_data['total_deaths'] = vaccination_data['total_deaths'].fillna(0).astype('int64')
vaccination_data['total_vaccinations'] = vaccination_data['total_vaccinations'].fillna(0).astype('int64')
vaccination_data['population'] = vaccination_data['population'].fillna(0).astype('int64')

spark_df = spark.createDataFrame(vaccination_data[schema.fieldNames()])
spark_df.show()

In [None]:
print("Schema of the Spark DataFrame:")
spark_df.printSchema()

In [None]:
columns_to_display = ['continent', 'total_cases', 'total_deaths', 'total_vaccinations', 'population']
spark_df.select(columns_to_display).show(5)

In [None]:
print("Displaying the 'continent' and 'total_cases' columns:")
spark_df.select('continent', 'total_cases').show(5)

In [None]:
print("Filtering records where 'total_cases' is greater than 1,000,000:")
spark_df.filter(spark_df['total_cases'] > 1000000).show(5)

In [None]:
from pyspark.sql import functions as F

spark_df_with_percentage = spark_df.withColumn(
    'death_percentage',
    (spark_df['total_deaths'] / spark_df['population']) * 100
)
spark_df_with_percentage = spark_df_with_percentage.withColumn(
    'death_percentage',
    F.concat(
        F.format_number(spark_df_with_percentage['death_percentage'], 2),
        F.lit('%')
    )
)
columns_to_display = ['total_deaths', 'population', 'death_percentage', 'continent', 'total_vaccinations', 'total_cases']
spark_df_with_percentage.select(columns_to_display).show(5)

In [None]:
print("Calculating the total deaths per continent:")
spark_df.groupby(['continent']).agg({"total_deaths": "SUM"}).show()

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType
def convert_total_deaths(total_deaths):
    return total_deaths * 2
spark.udf.register("convert_total_deaths", convert_total_deaths, IntegerType())

In [None]:
spark.sql("DROP VIEW IF EXISTS data_v")
spark_df.createTempView('data_v')
spark.sql('SELECT continent, total_deaths, convert_total_deaths(total_deaths) as converted_total_deaths FROM data_v').show()

In [None]:
spark.sql('SELECT * FROM data_v').show()

In [None]:
print("Displaying continent with total vaccinated more than 1 million:")
spark.sql("SELECT continent FROM data_v WHERE total_vaccinations > 1000000").show()