In [4]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import os
import configparser
import pandas as pd

In [5]:
# Set Postgres parameters

db_properties={}
config = configparser.ConfigParser()
config.read("parameters.cfg")
db_prop = config['postgresql']
db_url = db_prop['url']
db_user = db_prop['username']
db_password = db_prop['password']
db_properties['username']=db_prop['username']
db_properties['password']=db_prop['password']
db_properties['url']=db_prop['url']

In [6]:
#def create_spark_session():

"""
Description: This function  creates or get (if already exists) a Spark session 

Arguments:
    None

Returns:
    spark: Spark session
"""

spark = SparkSession \
    .builder \
    .getOrCreate()
#return spark

In [8]:
# get filepath to covid data file
input_data = "csse_covid_19_daily_reports/"
covid_data = os.path.join(input_data, "*.csv")

# read covid data and write it in postgres
print ("Reading covid data file") 
df = spark.read.csv(covid_data, header=True)

print("Writing covid data to Postgres")
df.select("*") \
   .write.mode("overwrite") \
   .mode("overwrite") \
   .format("jdbc") \
   .option("url", db_url) \
   .option("dbtable", "fact_covid") \
   .option("user", db_user) \
   .option("password", db_password) \
   .save()

Reading covid data file
Writing covid data to Postgres


In [9]:
# Import countries data

print("Creating countries table")

df_countries = spark \
    .read \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .load("countries.csv")

df_countries \
    .write \
    .mode("overwrite") \
    .format("jdbc") \
    .option("url", db_url) \
    .option("dbtable", "dim_countries") \
    .option("user", db_user) \
    .option("password", db_password) \
    .save()

Creating countries table


In [10]:
# Import Exconomic exposure data

print("Creating exposure table")

df_exposure = spark \
    .read \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", ";") \
    .load("exposure.csv")

df_exposure \
    .write \
    .mode("overwrite") \
    .format("jdbc") \
    .option("url", db_url) \
    .option("dbtable", "dim_exposure") \
    .option("user", db_user) \
    .option("password", db_password) \
    .save()

Creating exposure table


In [12]:
# Import Vaccination data

print("Creating vaccination table")

df_vaccination = spark \
    .read \
    .format("csv") \
    .option("header", "true") \
    .option("delimiter", ",") \
    .load("vaccination.csv")

df_vaccination \
    .write \
    .mode("overwrite") \
    .format("jdbc") \
    .option("url", db_url) \
    .option("dbtable", "dim_vaccination") \
    .option("user", db_user) \
    .option("password", db_password) \
    .save()

Creating vaccination table
