In [1]:
#Installing packages
import pymongo
import pandas as pd
import json

In [2]:
#Connecting to mongodb
from pymongo import MongoClient
client = MongoClient('localhost',27017)
db = client.mdfinalproject

In [3]:
records = []
with open ('COVID-19Timeseries.json') as k:
    records = json.load(k)

In [4]:
len(records)

21746

In [7]:
#Loading in into the data
collection = db.covid19patients
collection.insert_many(records)

<pymongo.results.InsertManyResult at 0x7fd248649f80>

In [8]:
total_docs = collection.count_documents({})
total_docs

21746

In [9]:
#MongoDB test sample
query = {
    "state": {
        "$regex": 'LA'
    }    
}
cur = collection.find(query)

In [None]:
#Appending the result into the list
a  = []

for i in cur:
    a.append(i)

In [11]:
#Visualizing the data
dat = pd.DataFrame(a)
dat = dat.iloc[:,1:61]
dat.head()

Unnamed: 0,state,date,critical_staffing_shortage_today_yes,critical_staffing_shortage_today_no,critical_staffing_shortage_today_not_reported,critical_staffing_shortage_anticipated_within_week_yes,critical_staffing_shortage_anticipated_within_week_no,critical_staffing_shortage_anticipated_within_week_not_reported,hospital_onset_covid,hospital_onset_covid_coverage,...,inpatient_bed_covid_utilization_numerator,inpatient_bed_covid_utilization_denominator,adult_icu_bed_covid_utilization,adult_icu_bed_covid_utilization_coverage,adult_icu_bed_covid_utilization_numerator,adult_icu_bed_covid_utilization_denominator,adult_icu_bed_utilization,adult_icu_bed_utilization_coverage,adult_icu_bed_utilization_numerator,adult_icu_bed_utilization_denominator
0,LA,8/1/2020,43,142,37,45,139,38,12.0,221,...,42.0,322.0,0.214721,220.0,423.0,1970.0,0.725381,220.0,1429.0,1970.0
1,LA,7/31/2020,44,150,43,46,147,44,10.0,236,...,39.0,322.0,0.22127,235.0,439.0,1984.0,0.721774,235.0,1432.0,1984.0
2,LA,7/30/2020,43,149,44,45,146,45,11.0,236,...,36.0,292.0,0.219624,234.0,432.0,1967.0,0.716319,234.0,1409.0,1967.0
3,LA,7/29/2020,42,161,46,46,156,47,13.0,249,...,39.0,292.0,0.22385,247.0,443.0,1979.0,0.712481,247.0,1410.0,1979.0
4,LA,7/28/2020,44,158,47,46,155,48,15.0,249,...,39.0,292.0,0.232641,247.0,459.0,1973.0,0.727319,247.0,1435.0,1973.0


In [12]:
#ApacheSpark
!pip install pyspark



In [13]:
#Connecting pyspark
from pyspark.sql.types import *
from datetime import datetime

from pyspark.conf import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import col
from pyspark.sql.types import BooleanType
from pyspark.sql.types import StringType
from pyspark.sql.functions import udf

In [14]:
sc = SparkContext() 
config = sc.getConf()
config.set('spark.cores.max','4')
config.set('spark.executor.memory', '8G')
config.set('spark.driver.maxResultSize', '8g')
config.set('spark.kryoserializer.buffer.max', '512m')
config.set("spark.driver.cores", "4")
sc.stop()

sc = SparkContext(conf = config) 
sqlContext = SQLContext(sc)
    # print("Using Apache Spark Version", sc.version)

In [15]:
#Functions to convert pandas dataframe to spark dataframe

def equivalent_type(f):
      if f == 'datetime64[ns]': return TimestampType()
      elif f == 'int64': return LongType()
      elif f == 'int32': return IntegerType()
      elif f == 'float64': return FloatType()
      else: return StringType()
      
def define_structure(string, format_type):
      try: typo = equivalent_type(format_type)
      except: typo = StringType()
      return StructField(string, typo)
      
def pandas_to_spark(pandas_df):
      columns = list(pandas_df.columns)
      types = list(pandas_df.dtypes)
      struct_list = []
      for column, typo in zip(columns, types): 
        struct_list.append(define_structure(column, typo))
      p_schema = StructType(struct_list)
      return sqlContext.createDataFrame(pandas_df, p_schema)

In [16]:
spark_df = pandas_to_spark(dat)

In [17]:
spark_df.columns

['state',
 'date',
 'critical_staffing_shortage_today_yes',
 'critical_staffing_shortage_today_no',
 'critical_staffing_shortage_today_not_reported',
 'critical_staffing_shortage_anticipated_within_week_yes',
 'critical_staffing_shortage_anticipated_within_week_no',
 'critical_staffing_shortage_anticipated_within_week_not_reported',
 'hospital_onset_covid',
 'hospital_onset_covid_coverage',
 'inpatient_beds',
 'inpatient_beds_coverage',
 'inpatient_beds_used',
 'inpatient_beds_used_coverage',
 'inpatient_beds_used_covid',
 'inpatient_beds_used_covid_coverage',
 'previous_day_admission_adult_covid_confirmed',
 'previous_day_admission_adult_covid_confirmed_coverage',
 'previous_day_admission_adult_covid_suspected',
 'previous_day_admission_adult_covid_suspected_coverage',
 'previous_day_admission_pediatric_covid_confirmed',
 'previous_day_admission_pediatric_covid_confirmed_coverage',
 'previous_day_admission_pediatric_covid_suspected',
 'previous_day_admission_pediatric_covid_suspected_

In [18]:
#test
spark_df.show(5)

+-----+---------+------------------------------------+-----------------------------------+---------------------------------------------+------------------------------------------------------+-----------------------------------------------------+---------------------------------------------------------------+--------------------+-----------------------------+--------------+-----------------------+-------------------+----------------------------+-------------------------+----------------------------------+--------------------------------------------+-----------------------------------------------------+--------------------------------------------+-----------------------------------------------------+------------------------------------------------+---------------------------------------------------------+------------------------------------------------+---------------------------------------------------------+-------------------------------+----------------------------------------+------

In [19]:
#test
cb_rdd = spark_df.select('*').rdd.map(lambda row: [str(row[i]) for i in ['state','date',]])
cb_sdf2 = sqlContext.createDataFrame(cb_rdd,['state','date'])

In [20]:
#test
x = cb_sdf2.filter("state like 'LA'")
x.show(10)

+-----+---------+
|state|     date|
+-----+---------+
|   LA| 8/1/2020|
|   LA|7/31/2020|
|   LA|7/30/2020|
|   LA|7/29/2020|
|   LA|7/28/2020|
|   LA|7/27/2020|
|   LA|7/26/2020|
|   LA|7/25/2020|
|   LA|7/24/2020|
|   LA|7/23/2020|
+-----+---------+
only showing top 10 rows



In [21]:
#test - spark dataframe to pandas dataframe
final_df = x.toPandas()
final_df.head()

Unnamed: 0,state,date
0,LA,8/1/2020
1,LA,7/31/2020
2,LA,7/30/2020
3,LA,7/29/2020
4,LA,7/28/2020


In [None]:
#Flask interactive application using MongoDb and Spark - 
# Input is registered from interactive application
# User input was taken as a search query criterion to pull required data from mongodb
# The data is converted to a spark dataframe in order to run data operations
# Note: Spark can be used for various types of aggregrations, indexing and ML applications for our scenario. In order to avoid run time errors, 
# we have chosen to include just indexing.
# Spark is used to index the data and pull out useful information and convert into spark dataframe to be displayed as an output in the web application

start_time = datetime.now()
from flask import Flask, request, render_template
app = Flask("interactive application")

@app.route('/')
def my_form():
    return render_template("my-form.html")

@app.route('/', methods=['POST'])
def my_form_post():
    val = request.form['userinput']
    query = {
    "state": {
        "$regex": val} }
    cur = collection.find(query)
    a  = []
    for i in cur:
        a.append(i)
    dat = pd.DataFrame(a)
    dat = dat.iloc[:,1:61]

    spark_df = pandas_to_spark(dat)
    cb_rdd = spark_df.select('*').rdd.map(lambda row: [str(row[i]) for i in ['state','date','inpatient_beds','inpatient_beds_used_covid','total_adult_patients_hospitalized_confirmed_and_suspected_covid','percent_of_inpatients_with_covid','adult_icu_bed_utilization']])
    cb_sdf2 = sqlContext.createDataFrame(cb_rdd,['state','date','inpatient_beds','inpatient_beds_used_covid','total_adult_patients_hospitalized_confirmed_and_suspected_covid','percent_of_inpatients_with_covid','adult_icu_bed_utilization'])
    final_df = cb_sdf2.toPandas()
   
    
    return str(final_df)#displaying results on the webpage
end_time = datetime.now()
print('Duration: {}'.format(end_time - start_time))

In [23]:
app.run(host='localhost', port=5003)

 * Serving Flask app "interactive application" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: off


 * Running on http://localhost:5003/ (Press CTRL+C to quit)
127.0.0.1 - - [17/Apr/2021 23:35:16] "[37mGET / HTTP/1.1[0m" 200 -


In [None]:
#Flask interactive application using MongoDb and Pandas
# Similar steps were performed. Spark was replaced with basic pandas dataframe indexing in order to compare perfomances

start_time = datetime.now()
from flask import Flask, request, render_template
app = Flask("interactive application")

@app.route('/')
def my_form():
    return render_template("my-form.html")

@app.route('/', methods=['POST'])
def my_form_post():
    val = request.form['userinput']
    query = {
    "state": {
        "$regex": val} }
    cur = collection.find(query)
    a  = []
    for i in cur:
        a.append(i)
    dat = pd.DataFrame(a)
    dat = dat.loc[:,['state','date','inpatient_beds','inpatient_beds_used_covid','total_adult_patients_hospitalized_confirmed_and_suspected_covid','percent_of_inpatients_with_covid','adult_icu_bed_utilization']]

    return str(dat)#displaying results on the webpage
end_time = datetime.now()
print('Duration: {}'.format(end_time - start_time))

In [None]:
app.run(host='localhost', port=5004)

 * Serving Flask app "interactive application" (lazy loading)
 * Environment: production
   Use a production WSGI server instead.
 * Debug mode: off


 * Running on http://localhost:5004/ (Press CTRL+C to quit)
127.0.0.1 - - [17/Apr/2021 21:33:31] "[37mGET / HTTP/1.1[0m" 200 -
127.0.0.1 - - [17/Apr/2021 21:33:31] "[33mGET /favicon.ico HTTP/1.1[0m" 404 -
127.0.0.1 - - [17/Apr/2021 21:33:34] "[37mPOST / HTTP/1.1[0m" 200 -


In [None]:
# Run time for spark interactive application was better than pandas. The difference is not significant in this case because our data is relatively smaller
# Upon running complex aggregrations or ML functions, run time difference would be very evident.