In [1]:
!pip install pyspark
!pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [22]:
import findspark
findspark.init()

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("PySpark_Exercises").getOrCreate()

print("SparkSession is active and ready to use.")

SparkSession is active and ready to use.


In [23]:
import pandas as pd

# Đọc dữ liệu từ nguồn trực tuyến
url = 'https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/KpHDlIzdtR63BdTofl1mOg/owid-covid-latest.csv'
vaccination_data = pd.read_csv(url)

# Hiển thị 5 dòng đầu tiên
vaccination_data.head()

Unnamed: 0,iso_code,continent,location,last_updated_date,total_cases,new_cases,new_cases_smoothed,total_deaths,new_deaths,new_deaths_smoothed,...,male_smokers,handwashing_facilities,hospital_beds_per_thousand,life_expectancy,human_development_index,population,excess_mortality_cumulative_absolute,excess_mortality_cumulative,excess_mortality,excess_mortality_cumulative_per_million
0,AFG,Asia,Afghanistan,2024-08-04,235214.0,0.0,0.0,7998.0,0.0,0.0,...,,37.746,0.5,64.83,0.511,41128770.0,,,,
1,OWID_AFR,,Africa,2024-08-04,13145380.0,36.0,5.143,259117.0,0.0,0.0,...,,,,,,1426737000.0,,,,
2,ALB,Europe,Albania,2024-08-04,335047.0,0.0,0.0,3605.0,0.0,0.0,...,51.2,,2.89,78.57,0.795,2842318.0,,,,
3,DZA,Africa,Algeria,2024-08-04,272139.0,18.0,2.571,6881.0,0.0,0.0,...,30.4,83.741,1.9,76.88,0.748,44903230.0,,,,
4,ASM,Oceania,American Samoa,2024-08-04,8359.0,0.0,0.0,34.0,0.0,0.0,...,,,,73.74,,44295.0,,,,


In [24]:
from pyspark.sql.types import StructType, StructField, StringType, LongType

# Định nghĩa schema
schema = StructType([
    StructField("continent", StringType(), True),
    StructField("total_cases", LongType(), True),
    StructField("total_deaths", LongType(), True),
    StructField("total_vaccinations", LongType(), True),
    StructField("population", LongType(), True)
])

# Xử lý dữ liệu để đảm bảo kiểu dữ liệu phù hợp
vaccination_data['continent'] = vaccination_data['continent'].astype(str)
vaccination_data['total_cases'] = vaccination_data['total_cases'].fillna(0).astype('int64')
vaccination_data['total_deaths'] = vaccination_data['total_deaths'].fillna(0).astype('int64')
vaccination_data['total_vaccinations'] = vaccination_data['total_vaccinations'].fillna(0).astype('int64')
vaccination_data['population'] = vaccination_data['population'].fillna(0).astype('int64')

# Tạo Spark DataFrame
spark_df = spark.createDataFrame(vaccination_data[schema.fieldNames()])
spark_df.show()

+-------------+-----------+------------+------------------+----------+
|    continent|total_cases|total_deaths|total_vaccinations|population|
+-------------+-----------+------------+------------------+----------+
|         Asia|     235214|        7998|                 0|  41128772|
|          nan|   13145380|      259117|                 0|1426736614|
|       Europe|     335047|        3605|                 0|   2842318|
|       Africa|     272139|        6881|                 0|  44903228|
|      Oceania|       8359|          34|                 0|     44295|
|       Europe|      48015|         159|                 0|     79843|
|       Africa|     107481|        1937|                 0|  35588996|
|North America|       3904|          12|                 0|     15877|
|North America|       9106|         146|                 0|     93772|
|South America|   10101218|      130663|                 0|  45510324|
|         Asia|     452273|        8777|                 0|   2780472|
|North

In [25]:
spark_df.printSchema()

root
 |-- continent: string (nullable = true)
 |-- total_cases: long (nullable = true)
 |-- total_deaths: long (nullable = true)
 |-- total_vaccinations: long (nullable = true)
 |-- population: long (nullable = true)



In [26]:
spark_df.select("continent","total_cases").show()

+-------------+-----------+
|    continent|total_cases|
+-------------+-----------+
|         Asia|     235214|
|          nan|   13145380|
|       Europe|     335047|
|       Africa|     272139|
|      Oceania|       8359|
|       Europe|      48015|
|       Africa|     107481|
|North America|       3904|
|North America|       9106|
|South America|   10101218|
|         Asia|     452273|
|North America|      44224|
|          nan|  301499099|
|      Oceania|   11861161|
|       Europe|    6082444|
|         Asia|     835757|
|North America|      39127|
|         Asia|     696614|
|         Asia|    2051348|
|North America|     108582|
+-------------+-----------+
only showing top 20 rows



In [27]:
spark_df.filter(spark_df.total_cases > 1000000).show()

+-------------+-----------+------------+------------------+----------+
|    continent|total_cases|total_deaths|total_vaccinations|population|
+-------------+-----------+------------+------------------+----------+
|          nan|   13145380|      259117|                 0|1426736614|
|South America|   10101218|      130663|                 0|  45510324|
|          nan|  301499099|     1637249|        9104304615|4721383370|
|      Oceania|   11861161|       25236|                 0|  26177410|
|       Europe|    6082444|       22534|                 0|   8939617|
|         Asia|    2051348|       29499|                 0| 171186368|
|       Europe|    4872829|       34339|                 0|  11655923|
|South America|    1212147|       22387|                 0|  12224114|
|South America|   37511921|      702116|                 0| 215313504|
|       Europe|    1329988|       38700|                 0|   6781955|
|North America|    4819055|       55282|         102877159|  38454328|
|South

In [28]:
from pyspark.sql.functions import format_number

spark_df_with_percentage = spark_df.withColumn("death_percentage", format_number((spark_df.total_deaths / spark_df.population) * 100, 2))

In [29]:
spark_df_with_percentage.show(5)

+---------+-----------+------------+------------------+----------+----------------+
|continent|total_cases|total_deaths|total_vaccinations|population|death_percentage|
+---------+-----------+------------+------------------+----------+----------------+
|     Asia|     235214|        7998|                 0|  41128772|            0.02|
|      nan|   13145380|      259117|                 0|1426736614|            0.02|
|   Europe|     335047|        3605|                 0|   2842318|            0.13|
|   Africa|     272139|        6881|                 0|  44903228|            0.02|
|  Oceania|       8359|          34|                 0|     44295|            0.08|
+---------+-----------+------------+------------------+----------+----------------+
only showing top 5 rows



In [30]:
spark_df.groupBy("continent").agg({"total_deaths": "sum"}).show()

+-------------+-----------------+
|    continent|sum(total_deaths)|
+-------------+-----------------+
|       Europe|          2102483|
|       Africa|           259117|
|          nan|         22430618|
|North America|          1671178|
|South America|          1354187|
|      Oceania|            32918|
|         Asia|          1637249|
+-------------+-----------------+



In [31]:
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType

In [36]:
def double_deaths(total_deaths):
    return total_deaths * 2

double_deaths_udf = spark.udf.register("double_deaths", double_deaths, IntegerType())

In [37]:
spark_df = spark_df.withColumn("doubled_total_deaths", double_deaths(spark_df.total_deaths))

In [35]:
spark_df.show(5)

+---------+-----------+------------+------------------+----------+--------------------+
|continent|total_cases|total_deaths|total_vaccinations|population|doubled_total_deaths|
+---------+-----------+------------+------------------+----------+--------------------+
|     Asia|     235214|        7998|                 0|  41128772|               15996|
|      nan|   13145380|      259117|                 0|1426736614|              518234|
|   Europe|     335047|        3605|                 0|   2842318|                7210|
|   Africa|     272139|        6881|                 0|  44903228|               13762|
|  Oceania|       8359|          34|                 0|     44295|                  68|
+---------+-----------+------------+------------------+----------+--------------------+
only showing top 5 rows



In [38]:
spark_df.createOrReplaceTempView("data_v")

In [39]:
spark.sql("SELECT * FROM data_v").show(5)

+---------+-----------+------------+------------------+----------+--------------------+
|continent|total_cases|total_deaths|total_vaccinations|population|doubled_total_deaths|
+---------+-----------+------------+------------------+----------+--------------------+
|     Asia|     235214|        7998|                 0|  41128772|               15996|
|      nan|   13145380|      259117|                 0|1426736614|              518234|
|   Europe|     335047|        3605|                 0|   2842318|                7210|
|   Africa|     272139|        6881|                 0|  44903228|               13762|
|  Oceania|       8359|          34|                 0|     44295|                  68|
+---------+-----------+------------+------------------+----------+--------------------+
only showing top 5 rows



In [40]:
spark.sql("SELECT continent, total_vaccinations FROM data_v WHERE total_vaccinations > 1000000").show()

+-------------+------------------+
|    continent|total_vaccinations|
+-------------+------------------+
|          nan|        9104304615|
|North America|         102877159|
|       Europe|          19047108|
|       Europe|           2171623|
|          nan|        1399334208|
|          nan|         951113290|
|          nan|        2840880020|
|         Asia|          21014839|
|         Asia|        2206868000|
|       Europe|           4604865|
|          nan|        4954377376|
|         Asia|          72657288|
|      Oceania|          13398398|
|          nan|        1158547416|
|          nan|          88358812|
|          nan|        5449980941|
|          nan|       13578774356|
+-------------+------------------+

