In [59]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, date_format, year, month, dayofmonth, concat, lower
import datetime as dt
import logging

In [60]:
logging.basicConfig(level=logging.ERROR,
                    format='%(asctime)s:%(funcName)s:%(levelname)s:%(message)s')
def createSparkSession():
    try:
        spark = SparkSession.\
                            builder.\
                            appName("Spark ETL").\
                            master("spark://spark-master:7077").\
                            config("spark.jars", "mysql-connector-j-8.0.33.jar").\
                            config("hive.metastore.uris", "thrift://hive-server:9083").\
                            enableHiveSupport().\
                            getOrCreate()
        logging.info("SparkSession was be created successfully")
    except Exception:
        logging.error("Fail to create SparkSession")
    return spark

In [61]:
spark = createSparkSession()

2023-10-01 16:11:44,571:createSparkSession:INFO:SparkSession was be created successfully


## EL MySQL to Datalake

### Extract

In [62]:
def extract_covid19_timeseries_mysql(spark):
    try:
        df_covid19_timeseries = spark.read \
                                     .format("jdbc") \
                                     .option("driver", "com.mysql.cj.jdbc.Driver") \
                                     .option("url", "jdbc:mysql://c-mysql:3306/covid19") \
                                     .option("dbtable", "covid19_timeseries") \
                                     .option("user", "root") \
                                     .option("password", "123") \
                                     .load()
        logging.info("Read covid19_timeseries from mysql successfully")
    except Exception:
        logging.warning("Couldn't read covid19_timeseries from mysql")
        
    return df_covid19_timeseries
def extract_worldometer_mysql(spark):
    try:
        df_worldometer = spark.read \
                              .format("jdbc") \
                              .option("driver", "com.mysql.cj.jdbc.Driver") \
                              .option("url", "jdbc:mysql://c-mysql:3306/covid19") \
                              .option("dbtable", "worldometer") \
                              .option("user", "root") \
                              .option("password", "123") \
                              .load()
        logging.info("Read worldometer from mysql successfully")
    except Exception:
        logging.warning("Couldn't read worldometer from mysql")
    return df_worldometer


In [63]:
extract_covid19_timeseries_mysql(spark).show(5)

2023-10-01 16:48:11,373:extract_covid19_timeseries_mysql:INFO:Read covid19_timeseries from mysql successfully


+-----------+--------+-------+-------------------+---------+------+---------+------+--------------------+----+
|    country|    lat_|  long_|               date|confirmed|deaths|recovered|active|          who_region|uuid|
+-----------+--------+-------+-------------------+---------+------+---------+------+--------------------+----+
|Afghanistan| 33.9391|  67.71|2020-01-22 00:00:00|        0|     0|        0|     0|Eastern Mediterra...|   1|
|    Albania| 41.1533|20.1683|2020-01-22 00:00:00|        0|     0|        0|     0|              Europe|   2|
|    Algeria| 28.0339| 1.6596|2020-01-22 00:00:00|        0|     0|        0|     0|              Africa|   3|
|    Andorra| 42.5063| 1.5218|2020-01-22 00:00:00|        0|     0|        0|     0|              Europe|   4|
|     Angola|-11.2027|17.8739|2020-01-22 00:00:00|        0|     0|        0|     0|              Africa|   5|
+-----------+--------+-------+-------------------+---------+------+---------+------+--------------------+----+
o

In [64]:
extract_worldometer_mysql(spark).show(5)

2023-10-01 16:48:13,671:extract_worldometer_mysql:INFO:Read worldometer from mysql successfully


+----+----+--------------+----------------+---------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------+-------+-----------+---------------------------+
|rank|cca3|       country|         capital|continent|2022_population|2020_population|2015_population|2010_population|2000_population|1990_population|1980_population|1970_population|     area|density|growth_rate|world_population_percentage|
+----+----+--------------+----------------+---------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------------+---------+-------+-----------+---------------------------+
|  36| AFG|   Afghanistan|           Kabul|     Asia|       41128771|       38972230|       33753499|       28189672|       19542982|       10694796|       12486631|       10752971| 652230.0|63.0587|     1.0257|                       0.52|
| 138| ALB|       Albania|          Tira

### Load

In [65]:
def load_to_datalake(raw_data, name_table):
    try:
        raw_data.write \
         .format("parquet") \
         .mode("overwrite") \
         .save(f"hdfs://hadoop-master:9000/datalake/{name_table}")
        logging.info(f"load {name_table} to datalake successfully")
    except Exception:
        logging.warning(f"Couldn't load {name_table} to datalake")
    

In [66]:
load_to_datalake(extract_covid19_timeseries_mysql(spark) \
                                    , "covid19_timeseries")
load_to_datalake(extract_worldometer_mysql(spark) \
                                    , "worldometer")

2023-10-01 16:48:13,902:extract_covid19_timeseries_mysql:INFO:Read covid19_timeseries from mysql successfully
2023-10-01 16:48:15,423:load_to_datalake:INFO:load covid19_timeseries to datalake successfully
2023-10-01 16:48:15,450:extract_worldometer_mysql:INFO:Read worldometer from mysql successfully
2023-10-01 16:48:15,712:load_to_datalake:INFO:load worldometer to datalake successfully


## ETL Datalake to Data Warehouse

### Extract

In [67]:
def extract_datalake(spark, name_table):
    try:
        df = spark.read \
                  .format("parquet") \
                  .load(f"hdfs://hadoop-master:9000/datalake/{name_table}")
        logging.info(f"Read {name_table} from datalake successfully")
    except Exception:
        logging.warning(f"Couldn't read {name_table} from datalake")
    return df

In [68]:
df_covid19_timeseries = extract_datalake(spark, "covid19_timeseries")
df_worldometer = extract_datalake(spark, "worldometer")

2023-10-01 16:48:15,852:extract_datalake:INFO:Read covid19_timeseries from datalake successfully
2023-10-01 16:48:15,951:extract_datalake:INFO:Read worldometer from datalake successfully


### Transform

In [69]:
def transform(df_covid19_timeseries, df_worldometer):
    cleaned_worldometer = df_worldometer.select(col("country") \
                                           , col("continent") \
                                           , col("2020_population"))

    
    cleaned_covid19_timeseries = df_covid19_timeseries \
                            .filter((col("confirmed") != 0) \
                                    | (col("deaths") != 0) \
                                    | (col("active") != 0))
    cleaned_covid19_timeseries = cleaned_covid19_timeseries.withColumn("date", col("date") \
                                           .cast("date"))
    cleaned_covid19_timeseries = cleaned_covid19_timeseries.withColumn("date_id" \
                                           , date_format(col("date"), "yyyyMMdd"))
    cleaned_covid19_timeseries = cleaned_covid19_timeseries.withColumn("year", year(col("date"))) \
                               .withColumn("month", month(col("date"))) \
                               .withColumn("day", dayofmonth(col("date")))
    cleaned_covid19_timeseries = cleaned_covid19_timeseries.drop("date")
    cleaned_covid19_timeseries = cleaned_covid19_timeseries.withColumn("who_id", lower(col("who_region")).substr(0,3))
    
    cleaned_data = cleaned_covid19_timeseries.join(cleaned_worldometer, "country", "inner")
    cleaned_data = cleaned_data.withColumn("pop_loc_id", lower(concat(col("country").substr(0,2) \
                           , col("lat_").cast("int") \
                           , col("long_").cast("int"))))
    return cleaned_data
def transform_worldometer(df_worldometer):
    cleaned_data = df_worldometer.select(col("country") \
                                        , col("continent") \
                                        , col("population"))
    return cleaned_data

In [70]:
cleaned_data = transform(df_covid19_timeseries, df_worldometer)

#### Dimesional data modeling with star schema

In [71]:
def create_fact_table(cleaned_data):
    fact_covid = cleaned_data.select(col("uuid") \
                                    , col("date_id") \
                                    , col("who_id") \
                                    , col("pop_loc_id") \
                                    , col("confirmed") \
                                    , col("deaths") \
                                    , col("recovered") \
                                    , col("active"))
    return fact_covid
def create_dimensional_table(cleaned_data):
    dim_date = cleaned_data.select(col("date_id") \
                                     , col("year") \
                                     , col("month") \
                                     , col("day")) \
                        .dropDuplicates()
    
    dim_who_region = cleaned_data.select(col("who_id") \
                                  , col("who_region")) \
                             .dropDuplicates()
    
    dim_location = cleaned_data.select(col("pop_loc_id") \
                                  , col("country") \
                                  , col("continent") \
                                  , col("lat_") \
                                  , col("long_")) \
                             .dropDuplicates()
    
    dim_population = cleaned_data.select(col("pop_loc_id") \
                                  , col("2020_population")) \
                             .dropDuplicates()
    
    return dim_date, dim_who_region, dim_location, dim_population

In [72]:
fact_covid = create_fact_table(cleaned_data)

In [73]:
dim_date, dim_who_region, dim_location, dim_population = create_dimensional_table(cleaned_data)

In [53]:
fact_covid.show(5)

+----+--------+------+----------+---------+------+---------+------+
|uuid| date_id|who_id|pop_loc_id|confirmed|deaths|recovered|active|
+----+--------+------+----------+---------+------+---------+------+
|  49|20200122|   wes|   ch31117|        1|     0|        0|     1|
|  50|20200122|   wes|   ch40116|       14|     0|        0|    14|
|  51|20200122|   wes|   ch30107|        6|     0|        0|     6|
|  52|20200122|   wes|   ch26117|        1|     0|        0|     1|
|  54|20200122|   wes|   ch23113|       26|     0|        0|    26|
+----+--------+------+----------+---------+------+---------+------+
only showing top 5 rows



In [54]:
dim_date.show(5)

+--------+----+-----+---+
| date_id|year|month|day|
+--------+----+-----+---+
|20200127|2020|    1| 27|
|20200522|2020|    5| 22|
|20200311|2020|    3| 11|
|20200420|2020|    4| 20|
|20200228|2020|    2| 28|
+--------+----+-----+---+
only showing top 5 rows



In [55]:
dim_who_region.show(5)

+------+--------------------+
|who_id|          who_region|
+------+--------------------+
|   eas|Eastern Mediterra...|
|   ame|            Americas|
|   afr|              Africa|
|   eur|              Europe|
|   sou|     South-East Asia|
+------+--------------------+
only showing top 5 rows



In [56]:
dim_location.show(5)

+----------+-----------+-------------+--------+--------+
|pop_loc_id|    country|    continent|    lat_|   long_|
+----------+-----------+-------------+--------+--------+
|  au-33151|  Australia|      Oceania|-33.8688| 151.209|
|    es5825|    Estonia|       Europe| 58.5953| 25.0136|
|  ca53-127|     Canada|North America| 53.7267|-127.648|
|    fr3-53|     France|       Europe|  3.9339|-53.1258|
|   sa13-60|Saint Lucia|North America| 13.9094|-60.9789|
+----------+-----------+-------------+--------+--------+
only showing top 5 rows



In [None]:
dim_population.show(5)

+----------+---------------+
|pop_loc_id|2020_population|
+----------+---------------+
|   ch40116|     1424929781|
|  au-33151|       25670051|
|    gu4-58|         797202|
|   fr14-61|       64480053|
|   ch37112|     1424929781|
+----------+---------------+
only showing top 5 rows



In [74]:
def load_to_warehouse(fact_covid, dim_date, dim_who_region, dim_location, dim_population):
    fact_covid = fact_covid.repartition(3)
    dim_date = dim_date.repartition(3)
    dim_who_region = dim_who_region.repartition(3)
    dim_location = dim_location.repartition(3)
    dim_population = dim_population.repartition(3)
    spark.sql("create database covid19;")
    fact_covid.write \
              .format("hive") \
              .mode("overwrite") \
              .saveAsTable("covid19.fact_covid")
    dim_date.write \
              .format("hive") \
              .mode("overwrite") \
              .saveAsTable("covid19.dim_date")
    dim_who_region.write \
              .format("hive") \
              .mode("overwrite") \
              .saveAsTable("covid19.dim_who_region")
    dim_location.write \
              .format("hive") \
              .mode("overwrite") \
              .saveAsTable("covid19.dim_location")
    dim_population.write \
              .format("hive") \
              .mode("overwrite") \
              .saveAsTable("covid19.dim_population")

In [None]:
load_to_warehouse(fact_covid, dim_date, dim_who_region, dim_location, dim_population)

In [76]:
print("Extracting data from mysql...")
df_covid19_timeseries = extract_covid19_timeseries_mysql(spark)
df_worldometer = extract_worldometer_mysql(spark)
print("Loading raw data to datalake...")
load_to_datalake(df_covid19_timeseries, "covid19_timeseries")
load_to_datalake(df_worldometer, "worldometer")
print("Running ETL process...")
raw_covid19_timeseries = extract_datalake(spark, "covid19_timeseries")
raw_worldometer = extract_datalake(spark, "worldometer")
cleaned_data = transform(raw_covid19_timeseries, raw_worldometer)
fact_covid = create_fact_table(cleaned_data)
dim_date, dim_who_region, dim_location, dim_population = create_dimensional_table(cleaned_data)
load_to_warehouse(fact_covid, dim_date, dim_who_region, dim_location, dim_population)
print("Finish ETL Process")

2023-10-01 16:51:24,100:extract_covid19_timeseries_mysql:INFO:Read covid19_timeseries from mysql successfully
2023-10-01 16:51:24,144:extract_worldometer_mysql:INFO:Read worldometer from mysql successfully


Extracting data from mysql...
Loading raw data to datalake...


2023-10-01 16:51:25,429:load_to_datalake:INFO:load covid19_timeseries to datalake successfully
2023-10-01 16:51:25,694:load_to_datalake:INFO:load worldometer to datalake successfully
2023-10-01 16:51:25,785:extract_datalake:INFO:Read covid19_timeseries from datalake successfully
2023-10-01 16:51:25,862:extract_datalake:INFO:Read worldometer from datalake successfully


Running ETL process...


Py4JJavaError: An error occurred while calling o640.saveAsTable.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 39.0 failed 4 times, most recent failure: Lost task 2.3 in stage 39.0 (TID 40) (172.18.0.14 executor 9): org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Mkdirs failed to create file:/home/jovyan/work/Notebook/spark-warehouse/covid19.db/fact_covid/.hive-staging_hive_2023-10-01_16-51-29_476_7586316276415486649-1/-ext-10000/_temporary/0/_temporary/attempt_202310011651316542452005649331396_0039_m_000002_40 (exists=false, cwd=file:/opt/bitnami/spark/work/app-20230929025056-0002/9)
	at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:274)
	at org.apache.spark.sql.hive.execution.HiveOutputWriter.<init>(HiveFileFormat.scala:148)
	at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:106)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:389)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Mkdirs failed to create file:/home/jovyan/work/Notebook/spark-warehouse/covid19.db/fact_covid/.hive-staging_hive_2023-10-01_16-51-29_476_7586316276415486649-1/-ext-10000/_temporary/0/_temporary/attempt_202310011651316542452005649331396_0039_m_000002_40 (exists=false, cwd=file:/opt/bitnami/spark/work/app-20230929025056-0002/9)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1081)
	at org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.getHiveRecordWriter(HiveIgnoreKeyTextOutputFormat.java:81)
	at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:286)
	at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:271)
	... 20 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeWrite$4(FileFormatWriter.scala:307)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.writeAndCommit(FileFormatWriter.scala:271)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeWrite(FileFormatWriter.scala:304)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:190)
	at org.apache.spark.sql.hive.execution.SaveAsHiveFile.saveAsHiveFile(SaveAsHiveFile.scala:50)
	at org.apache.spark.sql.hive.execution.SaveAsHiveFile.saveAsHiveFile$(SaveAsHiveFile.scala:34)
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.saveAsHiveFile(InsertIntoHiveTable.scala:71)
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.processInsert(InsertIntoHiveTable.scala:143)
	at org.apache.spark.sql.hive.execution.InsertIntoHiveTable.run(InsertIntoHiveTable.scala:105)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
	at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:382)
	at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:354)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.hive.execution.CreateHiveTableAsSelectCommand.run(CreateHiveTableAsSelectCommand.scala:84)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:75)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:73)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:84)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:118)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:195)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:103)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:827)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
	at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:94)
	at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:104)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:512)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:31)
	at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:488)
	at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:94)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:81)
	at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:133)
	at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:856)
	at org.apache.spark.sql.DataFrameWriter.createTable(DataFrameWriter.scala:697)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:675)
	at org.apache.spark.sql.DataFrameWriter.saveAsTable(DataFrameWriter.scala:570)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: java.io.IOException: Mkdirs failed to create file:/home/jovyan/work/Notebook/spark-warehouse/covid19.db/fact_covid/.hive-staging_hive_2023-10-01_16-51-29_476_7586316276415486649-1/-ext-10000/_temporary/0/_temporary/attempt_202310011651316542452005649331396_0039_m_000002_40 (exists=false, cwd=file:/opt/bitnami/spark/work/app-20230929025056-0002/9)
	at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:274)
	at org.apache.spark.sql.hive.execution.HiveOutputWriter.<init>(HiveFileFormat.scala:148)
	at org.apache.spark.sql.hive.execution.HiveFileFormat$$anon$1.newInstance(HiveFileFormat.scala:106)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.newOutputWriter(FileFormatDataWriter.scala:161)
	at org.apache.spark.sql.execution.datasources.SingleDirectoryDataWriter.<init>(FileFormatDataWriter.scala:146)
	at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:389)
	at org.apache.spark.sql.execution.datasources.WriteFilesExec.$anonfun$doExecuteWrite$1(WriteFiles.scala:100)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:888)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:888)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.io.IOException: Mkdirs failed to create file:/home/jovyan/work/Notebook/spark-warehouse/covid19.db/fact_covid/.hive-staging_hive_2023-10-01_16-51-29_476_7586316276415486649-1/-ext-10000/_temporary/0/_temporary/attempt_202310011651316542452005649331396_0039_m_000002_40 (exists=false, cwd=file:/opt/bitnami/spark/work/app-20230929025056-0002/9)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:515)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:500)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1195)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:1081)
	at org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat.getHiveRecordWriter(HiveIgnoreKeyTextOutputFormat.java:81)
	at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getRecordWriter(HiveFileFormatUtils.java:286)
	at org.apache.hadoop.hive.ql.io.HiveFileFormatUtils.getHiveRecordWriter(HiveFileFormatUtils.java:271)
	... 20 more
