The idea behind the project was to create information about Covid based on different countries around the world. Since most COVID-19 APIs are open-source with no tockens needed, this made it very easy to collect all the necessary information.

In [34]:
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from py2neo import Graph

First, we will pull in data from each country about their cases including cases including dead, recovered, and critical cases into a Spark data frame. We can visualize the schema using df.printSchema(). From this we can see we need to flatten the data and only extract the data we want.

In [2]:
spark = SparkSession.builder\
        .appName("SecondFrontExample")\
        .getOrCreate()
headers = {
    'Content-Type': "application/json",
    }

url_country = 'https://corona-api.com/countries' #https://about-corona.net/documentation
response = requests.request("GET", url_country, headers=headers)
rdd = spark.sparkContext.parallelize([response.text])
df=spark.read.json(rdd)
df.printSchema()

root
 |-- _cacheHit: boolean (nullable = true)
 |-- data: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- code: string (nullable = true)
 |    |    |-- coordinates: struct (nullable = true)
 |    |    |    |-- latitude: double (nullable = true)
 |    |    |    |-- longitude: double (nullable = true)
 |    |    |-- latest_data: struct (nullable = true)
 |    |    |    |-- calculated: struct (nullable = true)
 |    |    |    |    |-- cases_per_million_population: long (nullable = true)
 |    |    |    |    |-- death_rate: double (nullable = true)
 |    |    |    |    |-- recovered_vs_death_ratio: string (nullable = true)
 |    |    |    |    |-- recovery_rate: double (nullable = true)
 |    |    |    |-- confirmed: long (nullable = true)
 |    |    |    |-- critical: long (nullable = true)
 |    |    |    |-- deaths: long (nullable = true)
 |    |    |    |-- recovered: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    

In [58]:
df2 = df.select(explode('data').alias('data'))
df2.printSchema()
df2.createOrReplaceTempView('calculated')
df3 = df2.select('data.code',
                 'data.name',
                 'data.population',
                 'data.latest_data.calculated.death_rate',
                 'data.latest_data.calculated.recovery_rate',
                'data.latest_data.confirmed',
                'data.latest_data.critical',
                'data.latest_data.deaths',
                'data.latest_data.recovered')

df3 = df3.na.drop() #Removes all rows with null values, preliminary data cleaning
df3.show()

root
 |-- data: struct (nullable = true)
 |    |-- code: string (nullable = true)
 |    |-- coordinates: struct (nullable = true)
 |    |    |-- latitude: double (nullable = true)
 |    |    |-- longitude: double (nullable = true)
 |    |-- latest_data: struct (nullable = true)
 |    |    |-- calculated: struct (nullable = true)
 |    |    |    |-- cases_per_million_population: long (nullable = true)
 |    |    |    |-- death_rate: double (nullable = true)
 |    |    |    |-- recovered_vs_death_ratio: string (nullable = true)
 |    |    |    |-- recovery_rate: double (nullable = true)
 |    |    |-- confirmed: long (nullable = true)
 |    |    |-- critical: long (nullable = true)
 |    |    |-- deaths: long (nullable = true)
 |    |    |-- recovered: long (nullable = true)
 |    |-- name: string (nullable = true)
 |    |-- population: long (nullable = true)
 |    |-- today: struct (nullable = true)
 |    |    |-- confirmed: long (nullable = true)
 |    |    |-- deaths: long (nullable =

In [63]:
graph = Graph("bolt://neo4j:test@172.18.0.2")
tx = graph.begin()
for row in df3.rdd.collect():
    tx.evaluate('''MERGE (c:Country {name:$name, id:$code, population:$pop})
                MERGE (ca:Cases {number:$confirmed, deathRate:$deathr, recoveryRate:$recovrate})
                MERGE (d:Deaths {number:$deathnum})
                MERGE (cr:CriticalCase {number:$critcase})
                MERGE (rc:RecoveredCase {number:$recoveredcase})
                MERGE (c)-[hc:HAS_CASES]->(ca)
                MERGE (c)-[hd:HAS_DEATHS]->(d)
                ''', parameters = {'name': row.name, 'code':row.code, 'pop':row.population, 'confirmed':row.confirmed,\
                                   'deathr':row.death_rate, 'recovrate':row.recovery_rate, 'deathnum':row.deaths,\
                                   'critcase':row.critical, 'recoveredcase':row.recovered})
tx.commit()

<py2neo.database.work.TransactionSummary at 0x7f67e7776fd0>